This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 33fa7b0f3fa IGNITE-26328 Add table metrics (#6544)
33fa7b0f3fa is described below

commit 33fa7b0f3fadae266343ef2b39a780fb5686f215
Author: Slava Koptilin <slava.kopti...@gmail.com>
AuthorDate: Wed Sep 10 13:06:18 2025 +0300

    IGNITE-26328 Add table metrics (#6544)
---
 .../ignite/client/fakes/FakeInternalTable.java     |   6 +
 .../rebalance/ItRebalanceDistributedTest.java      |   3 +-
 .../apache/ignite/internal/index/IndexManager.java |   4 +-
 .../partition/replicator/fixtures/Node.java        |   3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   5 +-
 modules/table/build.gradle                         |   1 +
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   5 +-
 .../ignite/internal/table/ItColocationTest.java    |   5 +-
 .../internal/table/metrics/ItTableMetricsTest.java | 631 +++++++++++++++++++++
 .../ignite/internal/table/InternalTable.java       |   8 +
 .../apache/ignite/internal/table/TableImpl.java    |   6 +
 .../ignite/internal/table/TableViewInternal.java   |   8 +
 .../internal/table/distributed/TableManager.java   |  45 +-
 .../replicator/PartitionReplicaListener.java       | 161 +++++-
 .../distributed/storage/InternalTableImpl.java     |  12 +-
 .../internal/table/metrics/TableMetricSource.java  | 222 ++++++++
 .../distributed/TableManagerRecoveryTest.java      |   4 +-
 .../table/distributed/TableManagerTest.java        |   4 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   5 +-
 ...itionReplicaListenerSortedIndexLockingTest.java |   5 +-
 .../replication/PartitionReplicaListenerTest.java  |   5 +-
 .../ZonePartitionReplicaListenerTest.java          |   5 +-
 .../storage/InternalTableEstimatedSizeTest.java    |   8 +-
 .../distributed/storage/InternalTableImplTest.java |   5 +-
 .../table/metrics/TableMetricSourceTest.java       |  65 +++
 .../apache/ignite/distributed/ItTxTestCluster.java |   7 +-
 .../table/impl/DummyInternalTableImpl.java         |   8 +-
 29 files changed, 1207 insertions(+), 45 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 705788f6979..ce5759970d4 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -582,4 +583,9 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
                 payload)
                 .thenApply(resBytes -> new IgniteBiTuple<>(resBytes, 
FakeCompute.observableTimestamp.longValue()));
     }
+
+    @Override
+    public TableMetricSource metrics() {
+        return null;
+    }
 }
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 14c9cfd18be..d761b35bf3a 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1648,7 +1648,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     partitionReplicaLifecycleManager,
                     nodeProperties,
                     minTimeCollectorService,
-                    systemDistributedConfiguration
+                    systemDistributedConfiguration,
+                    metricManager
             ) {
                 @Override
                 protected TxStateStorage createTxStateTableStorage(
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 718717683f4..c23f00b43c5 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -211,8 +211,8 @@ public class IndexManager implements IgniteComponent {
 
             if (LOG.isInfoEnabled()) {
                 LOG.info(
-                        "Creating local index: name={}, id={}, tableId={}, 
token={}",
-                        index.name(), indexId, tableId, causalityToken
+                        "Creating local index: name={}, id={}, tableId={}, 
token={}, type={}",
+                        index.name(), indexId, tableId, causalityToken, 
index.indexType()
                 );
             }
 
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 84122d98061..1a06ed18804 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -803,7 +803,8 @@ public class Node {
                 partitionReplicaLifecycleManager,
                 nodeProperties,
                 minTimeCollectorService,
-                systemDistributedConfiguration
+                systemDistributedConfiguration,
+                metricManager
         ) {
 
             @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1b17e86b6fd..8e0bded6243 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -811,7 +811,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 partitionReplicaLifecycleListener,
                 nodeProperties,
                 minTimeCollectorService,
-                systemDistributedConfiguration
+                systemDistributedConfiguration,
+                metricManager
         );
 
         var indexManager = new IndexManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 06fb6074e40..4ec1efa464d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1148,7 +1148,8 @@ public class IgniteImpl implements Ignite {
                 partitionReplicaLifecycleManager,
                 nodeProperties,
                 minTimeCollectorService,
-                systemDistributedConfiguration
+                systemDistributedConfiguration,
+                metricManager
         );
 
         disasterRecoveryManager = new DisasterRecoveryManager(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index ff2f9fb7483..3c2b73cc3a2 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -88,6 +88,7 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.tx.TxManager;
@@ -101,6 +102,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -341,7 +343,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     mock(StreamerReceiverRunner.class),
                     () -> 10_000L,
                     () -> 10_000L,
-                    colocationEnabled()
+                    colocationEnabled(),
+                    new TableMetricSource(QualifiedName.fromSimple("test"))
             );
             this.dataAmount = dataAmount;
 
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 8b244242c92..86622fde1c3 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -86,6 +86,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-table'))
     testImplementation testFixtures(project(':ignite-low-watermark'))
     testImplementation testFixtures(project(':ignite-failure-handler'))
+    testImplementation testFixtures(project(':ignite-metrics'))
     testImplementation libs.jmh.core
     testImplementation libs.javax.annotations
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index ea00e8d6358..9f49366cc50 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -77,6 +78,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeEach;
@@ -215,7 +217,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         mock(IndexMetaStorage.class),
                         lowWatermark,
                         mock(FailureProcessor.class),
-                        new SystemPropertiesNodeProperties()
+                        new SystemPropertiesNodeProperties(),
+                        new 
TableMetricSource(QualifiedName.fromSimple("test_table"))
                 ) {
                     @Override
                     public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID senderId) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 8bc147b8e68..6cea81e1e53 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -105,6 +105,7 @@ import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersion
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.Nullable;
@@ -369,7 +371,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 mock(StreamerReceiverRunner.class),
                 () -> 10_000L,
                 () -> 10_000L,
-                colocationEnabled()
+                colocationEnabled(),
+                new TableMetricSource(QualifiedName.fromSimple("TEST"))
         );
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
new file mode 100644
index 00000000000..2982523a157
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
@@ -0,0 +1,631 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import static java.util.List.of;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
+import static 
org.apache.ignite.internal.table.metrics.TableMetricSource.RO_READS;
+import static 
org.apache.ignite.internal.table.metrics.TableMetricSource.RW_READS;
+import static 
org.apache.ignite.internal.table.metrics.TableMetricSource.WRITES;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests key-value and record view operations metrics.
+ */
+public class ItTableMetricsTest extends ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test_table_name".toUpperCase();
+
+    private static final String SORTED_IDX = "SORTED_IDX";
+    private static final String HASH_IDX = "HASH_IDX";
+
+    private static final String METRIC_SOURCE_NAME = 
TableMetricSource.SOURCE_NAME + '.'
+            + QualifiedName.fromSimple(TABLE_NAME).toCanonicalForm();
+
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @BeforeAll
+    void createTable() throws Exception {
+        sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val 
VARCHAR)");
+
+        sql("CREATE INDEX IF NOT EXISTS " + SORTED_IDX + " ON PUBLIC." + 
TABLE_NAME + " USING SORTED (id)");
+        sql("CREATE INDEX IF NOT EXISTS " + HASH_IDX + " ON PUBLIC." + 
TABLE_NAME + " USING HASH (val)");
+
+        if (colocationEnabled()) {
+            awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode());
+        }
+    }
+
+    /**
+     * Returns a key value view for the table {@link #TABLE_NAME}.
+     *
+     * @param nodeIndex Node index to create a key value view.
+     * @return Key value view.
+     */
+    private static KeyValueView<Integer, String> keyValueView(int nodeIndex) {
+        return 
CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).keyValueView(Integer.class, 
String.class);
+    }
+
+    /**
+     * Returns a record view for the table {@link #TABLE_NAME}.
+     *
+     * @param nodeIndex Node index to create a key value view.
+     * @return Record view.
+     */
+    private static RecordView<Tuple> recordView(int nodeIndex) {
+        return CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).recordView();
+    }
+
+    @Test
+    void get() {
+        // Implicit read-only transaction.
+        testKeyValueViewOperation(RO_READS, 1, view -> view.get(null, 12));
+
+        // Explicit read-write transaction.
+        testKeyValueViewOperation(RW_READS, 1, view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            view.get(tx, 12);
+
+            tx.commit();
+        });
+    }
+
+    @Test
+    void getAll() {
+        List<Integer> keys = of(12, 15, 17, 19, 23);
+
+        // Implicit getAll operation starts a read-write transaction when all 
keys are not mapped to the same partition.
+        testKeyValueViewOperation(RW_READS, keys.size(), view -> 
view.getAll(null, keys));
+
+        // Single key getAll operation starts a read-only transaction.
+        List<Integer> roKeys = of(12);
+        testKeyValueViewOperation(RO_READS, 1, view -> view.getAll(null, 
roKeys));
+
+        List<Integer> nonUniqueKeys = of(12, 15, 12);
+        testKeyValueViewOperation(RW_READS, nonUniqueKeys.size(), view -> 
view.getAll(null, nonUniqueKeys));
+    }
+
+    @Test
+    void getOrDefault() {
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+
+        Integer existingKey = 12;
+        Integer nonExistingKey = -1;
+
+        kvView.put(null, existingKey, "value_12");
+        kvView.remove(null, nonExistingKey);
+
+        testKeyValueViewOperation(RO_READS, 2, view -> {
+            view.getOrDefault(null, existingKey, "default");
+            view.getOrDefault(null, nonExistingKey, "default");
+        });
+
+        testKeyValueViewOperation(RW_READS, 2, view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            view.getOrDefault(tx, existingKey, "default");
+            view.getOrDefault(tx, nonExistingKey, "default");
+
+            tx.commit();
+        });
+    }
+
+    @Test
+    void contains() {
+        testKeyValueViewOperation(RO_READS, 1, view -> view.contains(null, 
12));
+
+        testKeyValueViewOperation(RW_READS, 1, view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            view.contains(tx, 12);
+
+            tx.commit();
+        });
+    }
+
+    @Test
+    void containsAll() {
+        List<Integer> keys = of(12, 15, 17, 19, 23);
+
+        // Implicit containsAll operation starts a read-write transaction when 
all keys are not mapped to the same partition.
+        testKeyValueViewOperation(RW_READS, keys.size(), view -> 
view.containsAll(null, keys));
+
+        // Single key.
+        List<Integer> roKeys = of(12);
+        testKeyValueViewOperation(RO_READS, 1, view -> view.containsAll(null, 
roKeys));
+    }
+
+    @Test
+    void put() {
+        testKeyValueViewOperation(WRITES, 1, view -> view.put(null, 42, 
"value_42"));
+    }
+
+    @Test
+    void putAll() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.putAll(null, values));
+    }
+
+    @Test
+    void getAndPut() {
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.getAndPut(null, 12, "value"));
+    }
+
+    @Test
+    void remove() {
+        Integer key = 12;
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value_42");
+
+        // Remove existing key.
+        testKeyValueViewOperation(WRITES, 1, view -> view.remove(null, key));
+
+        // Remove non existing key.
+        testKeyValueViewOperation(WRITES, 0, view -> view.remove(null, key));
+    }
+
+    @Test
+    void exactRemove() {
+        Integer key = 12;
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value_42");
+
+        // Remove existing key and non-matching value.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.remove(null, key, "wrong-value"));
+
+        // Remove existing key and matching value.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.remove(null, key, "value_42"));
+
+        // Remove non existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.remove(null, key, "value_42"));
+    }
+
+    @Test
+    void removeAll() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.removeAll(null);
+        kvView.putAll(null, values);
+
+        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.removeAll(null));
+    }
+
+    @Test
+    void removeCollectionKeys() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.putAll(null, values);
+
+        // Remove existing keys.
+        testKeyValueViewOperation(WRITES, values.size(), view -> 
view.removeAll(null, values.keySet()));
+
+        // Remove non-existing keys.
+        testKeyValueViewOperation(WRITES, 0, view -> view.removeAll(null, 
values.keySet()));
+
+        kvView.putAll(null, values);
+
+        // Remove non-unique keys.
+        List<Integer> nonUniqueKeys = of(12, 15, 12, 17, 19, 23);
+        testKeyValueViewOperation(WRITES, nonUniqueKeys.size() - 1, view -> 
view.removeAll(null, nonUniqueKeys));
+    }
+
+    @Test
+    void putIfAbsent() {
+        Integer key = 12;
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.remove(null, key);
+
+        // Insert absent key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.putIfAbsent(null, key, "value"));
+
+        // Insert existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.putIfAbsent(null, key, "value-42"));
+    }
+
+    @Test
+    void getAndRemove() {
+        Integer key = 12;
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value_42");
+
+        // Remove existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.getAndRemove(null, key));
+
+        // Remove non-existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.getAndRemove(null, key));
+    }
+
+    @Test
+    void replace() {
+        Integer key = 12;
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value");
+
+        // Replace existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.replace(null, key, "replaced"));
+
+        kvView.remove(null, key);
+
+        // Replace non-existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.replace(null, key, "value"));
+    }
+
+    @Test
+    void conditionalReplace() {
+        Integer key = 12;
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value");
+
+        // Replace existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.replace(null, key, "wrong", "replaced"));
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.replace(null, key, "value", "replaced"));
+
+        kvView.remove(null, key);
+
+        // Replace non-existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.replace(null, key, "replaced", "value"));
+    }
+
+    @Test
+    void getAndReplace() {
+        Integer key = 12;
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.put(null, key, "value");
+
+        // Replace existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> 
view.getAndReplace(null, key, "replaced"));
+
+        kvView.remove(null, key);
+
+        // Replace non-existing key.
+        testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> 
view.replace(null, key, "replaced"));
+    }
+
+    @Test
+    void insertAll() {
+        List<Tuple> keys = of(Tuple.create().set("id", 12), 
Tuple.create().set("id", 42));
+        List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", 
"value_" + t.intValue("id"))).collect(toList());
+
+        recordView(0).deleteAll(null, keys);
+
+        // Insert non-existing keys.
+        testRecordViewOperation(of(WRITES, RW_READS), of((long) recs.size(), 
(long) recs.size()), view -> view.insertAll(null, recs));
+
+        // Insert existing keys.
+        testRecordViewOperation(of(WRITES, RW_READS), of(0L, (long) 
recs.size()), view -> view.insertAll(null, recs));
+
+        recordView(0).delete(null, keys.get(0));
+
+        // Insert one non-existing key.
+        testRecordViewOperation(of(WRITES, RW_READS), of(1L, (long) 
recs.size()), view -> view.insertAll(null, recs));
+
+        // Insert non-unique keys.
+        List<Tuple> nonUniqueKeys = of(
+                Tuple.create().set("id", 12),
+                Tuple.create().set("id", 42),
+                Tuple.create().set("id", 12));
+        List<Tuple> nonUniqueValues = nonUniqueKeys
+                .stream()
+                .map(t -> Tuple.copy(t).set("val", "value_" + 
t.intValue("id")))
+                .collect(toList());
+
+        recordView(0).deleteAll(null, keys);
+
+        testRecordViewOperation(
+                of(WRITES, RW_READS),
+                of((long) nonUniqueKeys.size() - 1, (long) 
nonUniqueKeys.size()),
+                view -> view.insertAll(null, nonUniqueValues));
+    }
+
+    @Test
+    void deleteAll() {
+        List<Tuple> keys = of(Tuple.create().set("id", 12), 
Tuple.create().set("id", 42));
+        List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", 
"value_" + t.intValue("id"))).collect(toList());
+
+        recordView(0).upsertAll(null, recs);
+
+        // Delete existing keys.
+        testRecordViewOperation(WRITES, recs.size(), view -> 
view.deleteAll(null, keys));
+
+        // Delete non-existing keys.
+        testRecordViewOperation(WRITES, 0L, view -> view.deleteAll(null, 
keys));
+
+        recordView(0).insert(null, recs.get(0));
+
+        // Delete one non-existing key.
+        testRecordViewOperation(WRITES, 1L, view -> view.deleteAll(null, 
keys));
+
+        // Non-unique keys.
+        List<Tuple> nonUniqueKeys = of(
+                Tuple.create().set("id", 12),
+                Tuple.create().set("id", 42),
+                Tuple.create().set("id", 12));
+        List<Tuple> nonUniqueRecs = nonUniqueKeys
+                .stream()
+                .map(t -> Tuple.copy(t).set("val", "value_" + 
t.intValue("id")))
+                .collect(toList());
+
+        recordView(0).upsertAll(null, nonUniqueRecs);
+
+        testRecordViewOperation(WRITES, 2L, view -> view.deleteAll(null, 
nonUniqueKeys));
+    }
+
+    @Test
+    void deleteAllExact() {
+        List<Tuple> keys = of(Tuple.create().set("id", 12), 
Tuple.create().set("id", 42));
+        List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", 
"value_" + t.intValue("id"))).collect(toList());
+
+        recordView(0).upsertAll(null, recs);
+
+        // Delete existing keys.
+        testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 
(long) recs.size()), view -> view.deleteAllExact(null, recs));
+
+        // Delete non-existing keys.
+        testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 
0L), view -> view.deleteAllExact(null, recs));
+
+        recordView(0).insert(null, recs.get(0));
+
+        // Delete one non-existing key.
+        testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 
1L), view -> view.deleteAllExact(null, recs));
+
+        recordView(0).upsertAll(null, recs);
+        List<Tuple> nonExact = keys.stream().map(t -> Tuple.copy(t).set("val", 
"value_xyz_" + t.intValue("id"))).collect(toList());
+
+        testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 
0L), view -> view.deleteAllExact(null, nonExact));
+
+        // Non-unique keys.
+        List<Tuple> nonUniqueKeys = of(
+                Tuple.create().set("id", 12),
+                Tuple.create().set("id", 42),
+                Tuple.create().set("id", 12));
+        List<Tuple> nonUniqueRecs = nonUniqueKeys
+                .stream()
+                .map(t -> Tuple.copy(t).set("val", "value_" + 
t.intValue("id")))
+                .collect(toList());
+
+        recordView(0).upsertAll(null, nonUniqueRecs);
+
+        testRecordViewOperation(
+                of(RW_READS, WRITES),
+                of((long) nonUniqueRecs.size(), 2L),
+                view -> view.deleteAllExact(null, nonUniqueRecs));
+    }
+
+    @Test
+    void scan() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.removeAll(null);
+        kvView.putAll(null, values);
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, (long) 
values.size()), view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            Object[] emptyArgs = new Object[0];
+            sql(0, tx, "select * from " + TABLE_NAME, emptyArgs);
+
+            tx.commit();
+        });
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of((long) 
values.size(), 0L), view -> {
+            Object[] emptyArgs = new Object[0];
+            sql(0, null, "select * from " + TABLE_NAME, emptyArgs);
+        });
+    }
+
+    @Test
+    void sortedIndexScan() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.removeAll(null);
+        kvView.putAll(null, values);
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of(2L, 0L), view -> {
+            Object[] emptyArgs = new Object[0];
+            sql(0, null, "select /*+ force_index (" + SORTED_IDX + ") */ * 
from " + TABLE_NAME + " where id > 15 and id < 20", emptyArgs);
+        });
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 2L), view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            Object[] emptyArgs = new Object[0];
+            sql(0, tx, "select /*+ force_index (" + SORTED_IDX + ") */ * from 
" + TABLE_NAME + " where id > 15 and id < 20", emptyArgs);
+
+            tx.commit();
+        });
+    }
+
+    @Test
+    void hashIndexScan() {
+        Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, 
"19", 23, "23");
+
+        KeyValueView<Integer, String> kvView = keyValueView(0);
+        kvView.removeAll(null);
+        kvView.putAll(null, values);
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of(1L, 0L), view -> {
+            Object[] emptyArgs = new Object[0];
+            sql(0, null, "select /*+ force_index (" + HASH_IDX + ") */ * from 
" + TABLE_NAME + " where val = '19'", emptyArgs);
+        });
+
+        testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 1L), view -> {
+            Transaction tx = node(0).transactions().begin();
+
+            Object[] emptyArgs = new Object[0];
+            sql(0, tx, "select /*+ force_index (" + HASH_IDX + ") */ * from " 
+ TABLE_NAME + " where val = '19'", emptyArgs);
+
+            tx.commit();
+        });
+    }
+
+    /**
+     * Tests that the given operation increases the specified metric by the 
expected value.
+     *
+     * @param metricName Metric name to be checked.
+     * @param expectedValue Expected value to increase the metric.
+     * @param op Operation to be executed.
+     */
+    private void testKeyValueViewOperation(String metricName, long 
expectedValue, Consumer<KeyValueView<Integer, String>> op) {
+        testKeyValueViewOperation(of(metricName), of(expectedValue), op);
+    }
+
+    /**
+     * Tests that the given operation increases the specified metrics by the 
expected values.
+     *
+     * @param metricNames Metric names to be checked.
+     * @param expectedValues Expected values to increase the metrics.
+     * @param op Operation to be executed.
+     */
+    private void testKeyValueViewOperation(
+            List<String> metricNames,
+            List<Long> expectedValues,
+            Consumer<KeyValueView<Integer, String>> op
+    ) {
+        testOperation(metricNames, expectedValues, () -> 
op.accept(keyValueView(0)));
+    }
+
+    /**
+     * Tests that the given operation increases the specified metric by the 
expected value.
+     *
+     * @param metricName Metric name to be checked.
+     * @param expectedValue Expected value to increase the metric.
+     * @param op Operation to be executed.
+     */
+    private void testRecordViewOperation(String metricName, long 
expectedValue, Consumer<RecordView<Tuple>> op) {
+        testRecordViewOperation(of(metricName), of(expectedValue), op);
+    }
+
+    /**
+     * Tests that the given operation increases the specified metrics by the 
expected values.
+     *
+     * @param metricNames Metric names to be checked.
+     * @param expectedValues Expected values to increase the metrics.
+     * @param op Operation to be executed.
+     */
+    private void testRecordViewOperation(
+            List<String> metricNames,
+            List<Long> expectedValues,
+            Consumer<RecordView<Tuple>> op
+    ) {
+        testOperation(metricNames, expectedValues, () -> 
op.accept(recordView(0)));
+    }
+
+    /**
+     * Tests that the given operation increases the specified metrics by the 
expected values.
+     *
+     * @param metricNames Metric names to be checked.
+     * @param expectedValues Expected values to increase the metrics.
+     * @param op Operation to be executed.
+     */
+    private void testOperation(
+            List<String> metricNames,
+            List<Long> expectedValues,
+            Runnable op
+    ) {
+        assertThat(metricNames.size(), is(expectedValues.size()));
+
+        Map<String, Long> initialValues = metricValues(metricNames);
+
+        op.run();
+
+        Map<String, Long> actualValues = metricValues(metricNames);
+
+        for (int i = 0; i < metricNames.size(); ++i) {
+            String metricName = metricNames.get(i);
+            long expectedValue = expectedValues.get(i);
+
+            long initialValue = initialValues.get(metricName);
+            long actualValue = actualValues.get(metricName);
+
+            assertThat(
+                    "The actual metric value does not match the expected value 
"
+                            + "[metric=" + metricName + ", initial=" + 
initialValue + ", actual=" + actualValue
+                            + ", expected=" + (initialValue + expectedValue) + 
']',
+                    actualValue,
+                    is(initialValue + expectedValue));
+        }
+    }
+
+    /**
+     * Returns the sum of the specified metrics on all nodes.
+     *
+     * @param metricNames Metric names.
+     * @return Map of metric names to their values.
+     */
+    private Map<String, Long> metricValues(List<String> metricNames) {
+        Map<String, Long> values = new HashMap<>(metricNames.size());
+
+        for (int i = 0; i < initialNodes(); ++i) {
+            MetricSet tableMetrics = unwrapIgniteImpl(node(i))
+                    .metricManager()
+                    .metricSnapshot()
+                    .metrics()
+                    .get(METRIC_SOURCE_NAME);
+
+            metricNames.forEach(metricName ->
+                    values.compute(metricName, (k, v) -> {
+                        Metric metric = tableMetrics.get(metricName);
+
+                        assertThat("Metric not found [name=" + metricName + 
']', metric, is(notNullValue()));
+                        assertThat(
+                                "Metric is not a LongMetric [name=" + 
metricName + ", class=" + metric.getClass().getSimpleName() + ']',
+                                metric,
+                                instanceOf(LongMetric.class));
+
+                        return (v == null ? 0 : v) + ((LongMetric) 
metric).value();
+                    }));
+        }
+
+        return values;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 5cefe7547f9..44bdbadf5c9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -552,4 +553,11 @@ public interface InternalTable extends ManuallyCloseable {
      * @return The id.
      */
     ReplicationGroupId targetReplicationGroupId(int partId);
+
+    /**
+     * Returns a metric source for this table.
+     *
+     * @return Table metrics source.
+     */
+    TableMetricSource metrics();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 856142d3efd..8e9908b84cf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.sql.IgniteSql;
@@ -310,4 +311,9 @@ public class TableImpl implements TableViewInternal {
                     }
                 });
     }
+
+    @Override
+    public TableMetricSource metrics() {
+        return tbl.metrics();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 2a40ba63149..3dac8dd01d2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -127,4 +128,11 @@ public interface TableViewInternal extends Table {
      * @param indexId An index id to unregister.
      */
     void unregisterIndex(int indexId);
+
+    /**
+     * Returns a metric source for this table.
+     *
+     * @return Table metrics source.
+     */
+    TableMetricSource metrics();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 23cd96c1049..3126ff450ba 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -147,6 +147,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
@@ -232,6 +233,7 @@ import 
org.apache.ignite.internal.table.distributed.storage.BrokenTxStateStorage
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
 import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -464,6 +466,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     private final TableAssignmentsService assignmentsService;
     private final ReliableCatalogVersions reliableCatalogVersions;
 
+    private final MetricManager metricManager;
+
     /**
      * Creates a new table manager.
      *
@@ -494,6 +498,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param partitionReplicaLifecycleManager Partition replica lifecycle 
manager.
      * @param minTimeCollectorService Collects minimum required timestamp for 
each partition.
      * @param systemDistributedConfiguration System distributed configuration.
+     * @param metricManager Metric manager.
      */
     public TableManager(
             String nodeName,
@@ -533,7 +538,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
             NodeProperties nodeProperties,
             MinimumRequiredTimeCollectorService minTimeCollectorService,
-            SystemDistributedConfiguration systemDistributedConfiguration
+            SystemDistributedConfiguration systemDistributedConfiguration,
+            MetricManager metricManager
     ) {
         this.topologyService = topologyService;
         this.replicaMgr = replicaMgr;
@@ -563,6 +569,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
         this.nodeProperties = nodeProperties;
         this.minTimeCollectorService = minTimeCollectorService;
+        this.metricManager = metricManager;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1171,6 +1178,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private void onTableDrop(DropTableEventParameters parameters) {
         inBusyLock(busyLock, () -> {
+            unregisterMetricsSource(parameters.tableId());
+
             destructionEventsQueue.enqueue(new 
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
         });
     }
@@ -1533,7 +1542,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 indexMetaStorage,
                 lowWatermark,
                 failureProcessor,
-                nodeProperties
+                nodeProperties,
+                table.metrics()
         );
     }
 
@@ -1804,7 +1814,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 Objects.requireNonNull(streamerReceiverRunner),
                 () -> txCfg.value().readWriteTimeoutMillis(),
                 () -> txCfg.value().readOnlyTimeoutMillis(),
-                nodeProperties.colocationEnabled()
+                nodeProperties.colocationEnabled(),
+                createAndRegisterMetricsSource(tableName)
         );
 
         return new TableImpl(
@@ -3218,6 +3229,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 int tableId = tableDescriptor.id();
 
                 if (nextCatalog != null && nextCatalog.table(tableId) == null) 
{
+                    unregisterMetricsSource(tableId);
+
                     destructionEventsQueue.enqueue(new 
DestroyTableEvent(nextCatalog.version(), tableId));
                 }
 
@@ -3546,6 +3559,32 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
     }
 
+    private TableMetricSource createAndRegisterMetricsSource(QualifiedName 
tableName) {
+        TableMetricSource source = new TableMetricSource(tableName);
+
+        try {
+            metricManager.registerSource(source);
+            metricManager.enable(source);
+        } catch (Exception e) {
+            LOG.warn("Failed to register metrics source for table [name={}].", 
e, source.qualifiedTableName());
+        }
+
+        return source;
+    }
+
+    private void unregisterMetricsSource(int tableId) {
+        try {
+            TableViewInternal table = startedTables.get(tableId);
+            if (table == null) {
+                return;
+            }
+
+            metricManager.unregisterSource(table.metrics());
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister metrics source for table 
[tableId={}].", e, tableId);
+        }
+    }
+
     private static class TableClosedException extends IgniteInternalException {
         private static final long serialVersionUID = 1L;
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index a58ee9bb9fb..669a2a58ccc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -196,6 +196,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.TableUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
@@ -346,6 +347,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
     private static final boolean SKIP_UPDATES = 
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
 
+    private final TableMetricSource metrics;
+
     private final ReplicaPrimacyEngine replicaPrimacyEngine;
     private final TableAwareReplicaRequestPreProcessor 
tableAwareReplicaRequestPreProcessor;
     private final ReliableCatalogVersions reliableCatalogVersions;
@@ -384,6 +387,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
      * @param clusterNodeResolver Node resolver.
      * @param remotelyTriggeredResourceRegistry Resource registry.
      * @param indexMetaStorage Index meta storage.
+     * @param metrics Table metric source.
      */
     public PartitionReplicaListener(
             MvPartitionStorage mvDataStorage,
@@ -412,7 +416,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             IndexMetaStorage indexMetaStorage,
             LowWatermark lowWatermark,
             FailureProcessor failureProcessor,
-            NodeProperties nodeProperties
+            NodeProperties nodeProperties,
+            TableMetricSource metrics
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftCommandRunner = raftCommandRunner;
@@ -435,6 +440,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         this.replicationGroupId = replicationGroupId;
         this.tableId = tableId;
         this.tableLockKey = new TablePartitionId(tableId, 
replicationGroupId.partitionId());
+        this.metrics = metrics;
 
         this.schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
@@ -771,7 +777,11 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                             return completedFuture(rows);
                         } else {
                             return 
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
-                                    .thenApply(ignored -> rows);
+                                    .thenApply(ignored -> {
+                                        metrics.onRead(rows.size(), false);
+
+                                        return rows;
+                                    });
                         }
                     })
                     .whenComplete((rows, err) -> {
@@ -840,7 +850,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
 
-        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp)
+                ? nullCompletedFuture()
                 : safeTime.waitFor(readTimestamp);
 
         if (request.indexToUse() != null) {
@@ -853,18 +864,34 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             if (request.exactKey() != null) {
                 assert request.lowerBoundPrefix() == null && 
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
 
-                return safeReadFuture.thenCompose(unused -> 
lookupIndex(request, indexStorage));
+                return safeReadFuture
+                        .thenCompose(unused -> lookupIndex(request, 
indexStorage))
+                        .thenApply(rows -> {
+                            metrics.onRead(rows.size(), true);
+
+                            return rows;
+                        });
             }
 
             assert indexStorage.storage() instanceof SortedIndexStorage;
 
-            return safeReadFuture.thenCompose(unused -> 
scanSortedIndex(request, indexStorage));
+            return safeReadFuture
+                    .thenCompose(unused -> scanSortedIndex(request, 
indexStorage))
+                    .thenApply(rows -> {
+                        metrics.onRead(rows.size(), true);
+
+                        return rows;
+                    });
         }
 
         return safeReadFuture
                 .thenCompose(
-                        unused -> retrieveExactEntriesUntilCursorEmpty(txId, 
request.coordinatorId(), readTimestamp, cursorId, batchCount)
-                );
+                        unused -> retrieveExactEntriesUntilCursorEmpty(txId, 
request.coordinatorId(), readTimestamp, cursorId, batchCount))
+                .thenApply(rows -> {
+                    metrics.onRead(rows.size(), true);
+
+                    return rows;
+                });
     }
 
     /**
@@ -1054,11 +1081,13 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         HybridTimestamp readTimestamp = request.readTimestamp();
 
         if (request.requestType() != RO_GET) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+            throw new IgniteInternalException(
+                    Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", 
request.requestType()));
         }
 
-        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp)
+                ? nullCompletedFuture()
                 : safeTime.waitFor(request.readTimestamp());
 
         return safeReadFuture.thenCompose(unused -> 
resolveRowByPkForReadOnly(primaryKey, readTimestamp));
@@ -1090,11 +1119,13 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         HybridTimestamp readTimestamp = request.readTimestamp();
 
         if (request.requestType() != RO_GET_ALL) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+            throw new IgniteInternalException(
+                    Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", 
request.requestType()));
         }
 
-        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
+        CompletableFuture<Void> safeReadFuture = 
isPrimaryInTimestamp(isPrimary, readTimestamp)
+                ? nullCompletedFuture()
                 : safeTime.waitFor(request.readTimestamp());
 
         return safeReadFuture.thenCompose(unused -> {
@@ -1931,10 +1962,14 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
             // Nothing found in the storage, return null.
             if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
+                metrics.onRead(true);
+
                 return nullCompletedFuture();
             }
 
             if (writeIntents.isEmpty()) {
+                metrics.onRead(true);
+
                 // No write intents, then return the committed value. We 
already know that regularEntries is not empty.
                 return completedFuture(regularEntries.get(0).binaryRow());
             } else {
@@ -1948,6 +1983,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                         resolveWriteIntentReadability(writeIntent, ts)
                                 .thenApply(writeIntentReadable ->
                                         inBusyLock(busyLock, () -> {
+                                            metrics.onRead(true);
+
                                             if (writeIntentReadable) {
                                                 return findAny(writeIntents, 
wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
                                             } else {
@@ -2104,6 +2141,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     }
 
                     if (rowIdsToDelete.isEmpty()) {
+                        metrics.onRead(searchRows.size(), false);
+
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
@@ -2117,7 +2156,12 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                             leaseStartTime
                                     )
                             )
-                            .thenApply(res -> new ReplicaResult(result, res));
+                            .thenApply(res -> {
+                                metrics.onRead(searchRows.size(), false);
+                                metrics.onWrite(rowIdsToDelete.size());
+
+                                return new ReplicaResult(result, res);
+                            });
                 });
             }
             case RW_INSERT_ALL: {
@@ -2153,6 +2197,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     }
 
                     if (rowsToInsert.isEmpty()) {
+                        metrics.onRead(searchRows.size(), false);
+
                         return completedFuture(new ReplicaResult(result, 
null));
                     }
 
@@ -2185,6 +2231,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                             )
                             .thenApply(res -> {
+                                metrics.onRead(searchRows.size(), false);
+                                metrics.onWrite(rowsToInsert.size());
+
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> insertLockFut : insertLockFuts) {
                                     insertLockFut.join().get2()
@@ -2222,11 +2271,14 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     }
                 }
 
+                int uniqueKeysCount = 0;
                 for (int i = 0; i < searchRows.size(); i++) {
                     if (rowIdFuts[i] != null) {
                         continue; // Skip previous row with the same key.
                     }
 
+                    uniqueKeysCount++;
+
                     BinaryRow searchRow = searchRows.get(i);
                     boolean isDelete = deleted != null && deleted.get(i);
 
@@ -2256,6 +2308,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     });
                 }
 
+                int uniqueKeysCountFinal = uniqueKeysCount;
+
                 return allOf(rowIdFuts).thenCompose(ignore -> {
                     Map<UUID, TimedBinaryRowMessage> rowsToUpdate = 
IgniteUtils.newHashMap(searchRows.size());
                     List<RowId> rows = new ArrayList<>();
@@ -2282,6 +2336,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     }
 
                     if (rowsToUpdate.isEmpty()) {
+                        metrics.onRead(uniqueKeysCountFinal, false);
+
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
@@ -2296,6 +2352,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                             )
                             .thenApply(res -> {
+                                metrics.onRead(uniqueKeysCountFinal, false);
+                                metrics.onWrite(uniqueKeysCountFinal);
+
                                 // Release short term locks.
                                 for (CompletableFuture<IgniteBiTuple<RowId, 
Collection<Lock>>> rowIdFut : rowIdFuts) {
                                     IgniteBiTuple<RowId, Collection<Lock>> 
futRes = rowIdFut.join();
@@ -2356,11 +2415,17 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                             }
 
                             if (allElementsAreNull(result)) {
+                                metrics.onRead(result.size(), false);
+
                                 return completedFuture(new 
ReplicaResult(result, null));
                             }
 
                             return 
validateRwReadAgainstSchemaAfterTakingLocks(txId)
-                                    .thenApply(unused -> new 
ReplicaResult(result, null));
+                                    .thenApply(unused -> {
+                                        metrics.onRead(result.size(), false);
+
+                                        return new ReplicaResult(result, null);
+                                    });
                         });
             }
             case RW_DELETE_ALL: {
@@ -2422,7 +2487,11 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                             leaseStartTime
                                     )
                             )
-                            .thenApply(res -> new ReplicaResult(result, res));
+                            .thenApply(res -> {
+                                metrics.onWrite(rowIdsToDelete.size());
+
+                                return new ReplicaResult(result, res);
+                            });
                 });
             }
             default: {
@@ -2732,7 +2801,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         HybridTimestamp readTimestamp = opStartTimestamp;
 
         if (request.requestType() != RO_GET) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+            throw new IgniteInternalException(
+                    Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", 
request.requestType()));
         }
 
@@ -2757,12 +2827,16 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             case RW_DELETE_EXACT: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
+                        metrics.onRead(false);
+
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
                     return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                             .thenCompose(validatedRowId -> {
                                 if (validatedRowId == null) {
+                                    metrics.onRead(false);
+
                                     return completedFuture(new 
ReplicaResult(false, null));
                                 }
 
@@ -2778,13 +2852,20 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                                         leaseStartTime
                                                 )
                                         )
-                                        .thenApply(res -> new 
ReplicaResult(true, res));
+                                        .thenApply(res -> {
+                                            metrics.onRead(false);
+                                            metrics.onWrite();
+
+                                            return new ReplicaResult(true, 
res);
+                                        });
                             });
                 });
             }
             case RW_INSERT: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId != null) {
+                        metrics.onRead(false);
+
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
@@ -2804,6 +2885,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
+                                metrics.onRead(false);
+                                metrics.onWrite();
+
                                 // Release short term locks.
                                 tuple.get2().get2().forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
@@ -2836,6 +2920,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
+                                metrics.onWrite();
+
                                 // Release short term locks.
                                 tuple.get2().get2().forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
@@ -2868,6 +2954,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
+                                metrics.onRead(false);
+                                metrics.onWrite();
+
                                 // Release short term locks.
                                 tuple.get2().get2().forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
@@ -2878,6 +2967,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             case RW_GET_AND_REPLACE: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
+                        metrics.onRead(false);
+
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
@@ -2896,6 +2987,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
+                                metrics.onRead(false);
+                                metrics.onWrite();
+
                                 // Release short term locks.
                                 tuple.get2().get2().forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
@@ -2906,6 +3000,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             case RW_REPLACE_IF_EXIST: {
                 return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
+                        metrics.onRead(false);
+
                         return completedFuture(new ReplicaResult(false, null));
                     }
 
@@ -2924,6 +3020,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock)))
                             .thenApply(tuple -> {
+                                metrics.onRead(false);
+                                metrics.onWrite();
+
                                 // Release short term locks.
                                 tuple.get2().get2().forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
 
@@ -2957,12 +3056,18 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             case RW_GET: {
                 return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
+                        metrics.onRead(false);
+
                         return nullCompletedFuture();
                     }
 
                     return takeLocksForGet(rowId, txId)
                             .thenCompose(ignored -> 
validateRwReadAgainstSchemaAfterTakingLocks(txId))
-                            .thenApply(ignored -> new ReplicaResult(row, 
null));
+                            .thenApply(ignored -> {
+                                metrics.onRead(false);
+
+                                return new ReplicaResult(row, null);
+                            });
                 });
             }
             case RW_DELETE: {
@@ -2988,12 +3093,18 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                             leaseStartTime
                                     )
                             )
-                            .thenApply(res -> new ReplicaResult(true, res));
+                            .thenApply(res -> {
+                                metrics.onWrite();
+
+                                return new ReplicaResult(true, res);
+                            });
                 });
             }
             case RW_GET_AND_DELETE: {
                 return resolveRowByPk(primaryKey, txId, (rowId, row, 
lastCommitTime) -> {
                     if (rowId == null) {
+                        metrics.onRead(false);
+
                         return nullCompletedFuture();
                     }
 
@@ -3014,7 +3125,12 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                             leaseStartTime
                                     )
                             )
-                            .thenApply(res -> new ReplicaResult(row, res));
+                            .thenApply(res -> {
+                                metrics.onRead(false);
+                                metrics.onWrite();
+
+                                return new ReplicaResult(row, res);
+                            });
                 });
             }
             default: {
@@ -3229,12 +3345,16 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         if (request.requestType() == RW_REPLACE) {
             return resolveRowByPk(extractPk(newRow), txId, (rowId, row, 
lastCommitTime) -> {
                 if (rowId == null) {
+                    metrics.onRead(false);
+
                     return completedFuture(new ReplicaResult(false, null));
                 }
 
                 return takeLocksForReplace(expectedRow, row, newRow, rowId, 
txId)
                         .thenCompose(rowIdLock -> {
                             if (rowIdLock == null) {
+                                metrics.onRead(false);
+
                                 return completedFuture(new 
ReplicaResult(false, null));
                             }
 
@@ -3256,6 +3376,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                                     )
                                     .thenApply(res -> new IgniteBiTuple<>(res, 
rowIdLock))
                                     .thenApply(tuple -> {
+                                        metrics.onRead(false);
+                                        metrics.onWrite();
+
                                         // Release short term locks.
                                         tuple.get2().get2()
                                                 .forEach(lock -> 
lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9fa5cb52364..8f418d59651 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -119,6 +119,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import 
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionIds;
@@ -211,6 +212,8 @@ public class InternalTableImpl implements InternalTable {
 
     private final boolean colocationEnabled;
 
+    private final TableMetricSource metrics;
+
     /**
      * Constructor.
      *
@@ -249,7 +252,8 @@ public class InternalTableImpl implements InternalTable {
             StreamerReceiverRunner streamerReceiverRunner,
             Supplier<Long> defaultRwTxTimeout,
             Supplier<Long> defaultReadTxTimeout,
-            boolean colocationEnabled
+            boolean colocationEnabled,
+            TableMetricSource metrics
     ) {
         this.tableName = tableName;
         this.zoneId = zoneId;
@@ -269,6 +273,7 @@ public class InternalTableImpl implements InternalTable {
         this.defaultRwTxTimeout = defaultRwTxTimeout;
         this.defaultReadTxTimeout = defaultReadTxTimeout;
         this.colocationEnabled = colocationEnabled;
+        this.metrics = metrics;
     }
 
     /** {@inheritDoc} */
@@ -2404,4 +2409,9 @@ public class InternalTableImpl implements InternalTable {
                 Boolean full
         );
     }
+
+    @Override
+    public TableMetricSource metrics() {
+        return metrics;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
new file mode 100644
index 00000000000..0348a4f9b79
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.table.metrics.TableMetricSource.Holder;
+import org.apache.ignite.table.QualifiedName;
+
+/**
+ * Set of metrics related to a specific table.
+ *
+ * <p>
+ * <b>Metrics affected by key-value and record view operations:</b>
+ * <table border="1">
+ *   <caption>Methods and affected metrics</caption>
+ *   <tr>
+ *     <th>Method(s)</th>
+ *     <th>RoReads</th>
+ *     <th>RwReads</th>
+ *     <th>Writes</th>
+ *   </tr>
+ *   <tr>
+ *     <td>get, getOrDefault, contains</td>
+ *     <td>Yes, for read-only transactions</td>
+ *     <td>Yes, for read-write transactions</td>
+ *     <td>No</td>
+ *   </tr>
+ *   <tr>
+ *     <td>getAll, containsAll</td>
+ *     <td>Yes, it is incremented by the number of keys read for read-only 
transactions</td>
+ *     <td>Yes, it is incremented by the number of keys read for read-write 
transactions</td>
+ *     <td>No</td>
+ *   </tr>
+ *   <tr>
+ *     <td>put, upsert</td>
+ *     <td>No</td>
+ *     <td>No</td>
+ *     <td>Yes</td>
+ *   </tr>
+ *   <tr>
+ *     <td>putAll, upsertAll</td>
+ *     <td>No</td>
+ *     <td>No</td>
+ *     <td>Yes, it is incremented by the number of keys inserted</td>
+ *   </tr>
+ *   <tr>
+ *     <td>putIfAbsent, insert</td>
+ *     <td>No</td>
+ *     <td>Yes</td>
+ *     <td>Yes, if the method returns true</td>
+ *   </tr>
+ *   <tr>
+ *     <td>insertAll</td>
+ *     <td>No</td>
+ *     <td>Yes, it is incremented by the number of keys read, which is equal 
to the number of keys provided</td>
+ *     <td>Yes, it is incremented by the number of keys inserted</td>
+ *   </tr>
+ *   <tr>
+ *     <td>getAndPut, replace, getAndReplace</td>
+ *     <td>No</td>
+ *     <td>Yes</td>
+ *     <td>Yes, if the value is inserted / replaced</td>
+ *   </tr>
+ *   <tr>
+ *     <td>remove, delete</td>
+ *     <td>No</td>
+ *     <td>No</td>
+ *     <td>Yes, if the method returns true</td>
+ *   </tr>
+ *   <tr>
+ *     <td>removeAll, getAndRemove, deleteAll</td>
+ *     <td>No</td>
+ *     <td>No</td>
+ *     <td>Yes, it is incremented by the number of keys removed</td>
+ *   </tr>
+ *   <tr>
+ *     <td>conditional remove, deleteExact, deleteAllExact</td>
+ *     <td>No</td>
+ *     <td>Yes, it is incremented by the number of keys read, which is equal 
to the number of keys provided</td>
+ *     <td>Yes, it is incremented by the number of keys removed</td>
+ *   </tr>
+ *   <tr>
+ *     <td>getAndRemove</td>
+ *     <td>No</td>
+ *     <td>Yes</td>
+ *     <td>Yes, if the value is removed</td>
+ *   </tr>
+ * </table>
+ *
+ * <i>Note: Only synchronous methods are listed. Asynchronous methods affect 
the same metrics.</i>
+ */
+public class TableMetricSource extends AbstractMetricSource<Holder> {
+    /** Source name. */
+    public static final String SOURCE_NAME = "tables";
+
+    /** Metric names. */
+    public static final String RO_READS = "RoReads";
+    public static final String RW_READS = "RwReads";
+    public static final String WRITES = "Writes";
+
+    private final QualifiedName tableName;
+
+    /**
+     * Creates a new instance of {@link TableMetricSource}.
+     *
+     * @param tableName Qualified table name.
+     */
+    public TableMetricSource(QualifiedName tableName) {
+        super(SOURCE_NAME + '.' + tableName.toCanonicalForm(), "Table 
metrics.", "tables");
+        this.tableName = tableName;
+    }
+
+    /**
+     * Returns the qualified name of the table.
+     *
+     * @return Qualified name of the table.
+     */
+    public QualifiedName qualifiedTableName() {
+        return tableName;
+    }
+
+    /**
+     * Increments a counter of reads.
+     *
+     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
+     */
+    public void onRead(boolean readOnly) {
+        Holder holder = holder();
+
+        if (holder != null) {
+            if (readOnly) {
+                holder.roReads.increment();
+            } else {
+                holder.rwReads.increment();
+            }
+        }
+    }
+
+    /**
+     * Adds the given {@code x} to a counter of reads.
+     *
+     * @param readOnly {@code true} if read operation is executed within 
read-only transaction, and {@code false} otherwise.
+     */
+    public void onRead(int x, boolean readOnly) {
+        Holder holder = holder();
+
+        if (holder != null) {
+            if (readOnly) {
+                holder.roReads.add(x);
+            } else {
+                holder.rwReads.add(x);
+            }
+        }
+    }
+
+    /**
+     * Increments a counter of writes.
+     */
+    public void onWrite() {
+        Holder holder = holder();
+
+        if (holder != null) {
+            holder.writes.increment();
+        }
+    }
+
+    /**
+     * Adds the given {@code x} to a counter of writes.
+     */
+    public void onWrite(int x) {
+        Holder holder = holder();
+
+        if (holder != null) {
+            holder.writes.add(x);
+        }
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder();
+    }
+
+    /** Actual metrics holder. */
+    protected static class Holder implements 
AbstractMetricSource.Holder<Holder> {
+        private final LongAdderMetric roReads = new LongAdderMetric(
+                RO_READS,
+                "The total number of reads executed within read-write 
transactions.");
+
+        private final LongAdderMetric rwReads = new LongAdderMetric(
+                RW_READS,
+                "The total number of reads executed within read-only 
transactions.");
+
+        private final LongAdderMetric writes = new LongAdderMetric(
+                WRITES,
+                "The total number of writes executed within read-write 
transactions.");
+
+        private final List<Metric> metrics = List.of(roReads, rwReads, writes);
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return metrics;
+        }
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 64895bf0194..ea9a719d6ab 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -101,6 +101,7 @@ import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.InternalClusterNodeImpl;
@@ -498,7 +499,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 partitionReplicaLifecycleManager,
                 new SystemPropertiesNodeProperties(),
                 minTimeCollectorService,
-                systemDistributedConfiguration
+                systemDistributedConfiguration,
+                new NoOpMetricManager()
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 416cf5aaa70..c02f6a47f49 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.metastorage.Revisions;
 import 
org.apache.ignite.internal.metastorage.impl.MetaStorageRevisionListenerRegistry;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.InternalClusterNodeImpl;
@@ -923,7 +924,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 partitionReplicaLifecycleManager,
                 new SystemPropertiesNodeProperties(),
                 new MinimumRequiredTimeCollectorServiceImpl(),
-                systemDistributedConfiguration
+                systemDistributedConfiguration,
+                new NoOpMetricManager()
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 8992f808425..56407da3710 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -114,6 +114,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockManager;
@@ -130,6 +131,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
 import org.hamcrest.CustomMatcher;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeAll;
@@ -291,7 +293,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 mock(IndexMetaStorage.class),
                 new TestLowWatermark(),
                 new NoOpFailureManager(),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
 
         kvMarshaller = new 
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, 
Integer.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index 1511e38c555..767cec133eb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -109,6 +109,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockManager;
@@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.table.QualifiedName;
 import org.hamcrest.CustomMatcher;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeAll;
@@ -260,7 +262,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest 
extends IgniteAbstra
                 mock(IndexMetaStorage.class),
                 new TestLowWatermark(),
                 new NoOpFailureManager(),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
 
         kvMarshaller = new 
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, 
Integer.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 1cb08db2bfe..30cddf6ba4e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -227,6 +227,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.StaleTransactionO
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -263,6 +264,7 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.TransactionException;
 import org.hamcrest.Matcher;
 import org.jetbrains.annotations.Nullable;
@@ -692,7 +694,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 indexMetaStorage,
                 lowWatermark,
                 new NoOpFailureManager(),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
 
         kvMarshaller = marshallerFor(schemaDescriptor);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 6e4596ffd7c..097fe41f2b3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -192,6 +192,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -226,6 +227,7 @@ import 
org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -661,7 +663,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 indexMetaStorage,
                 lowWatermark,
                 failureManager,
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
 
         kvMarshaller = marshallerFor(schemaDescriptor);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index a1482277953..631795540dc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -112,6 +113,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -235,7 +237,8 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 mock(StreamerReceiverRunner.class),
                 () -> 10_000L,
                 () -> 10_000L,
-                colocationEnabled()
+                colocationEnabled(),
+                new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME))
         );
 
         when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
@@ -344,7 +347,8 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 indexMetaStorage,
                 new TestLowWatermark(),
                 new NoOpFailureManager(),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 72620208e3c..082e723189f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -99,6 +100,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -247,7 +249,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(StreamerReceiverRunner.class),
                 () -> 10_000L,
                 () -> 10_000L,
-                colocationEnabled()
+                colocationEnabled(),
+                new TableMetricSource(QualifiedName.fromSimple("test"))
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
new file mode 100644
index 00000000000..7e942f12da0
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.table.QualifiedName;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests metric source name and table metric names.
+ * If you want to change the name, or add a new metric, please don't forget to 
update the corresponding documentation.
+ */
+public class TableMetricSourceTest {
+    private static final String TABLE_NAME = "test_table";
+
+    @Test
+    void testMetricSourceName() {
+        QualifiedName qualifiedTableName = 
QualifiedName.fromSimple(TABLE_NAME);
+
+        var metricSource = new TableMetricSource(qualifiedTableName);
+
+        assertThat(TableMetricSource.SOURCE_NAME, is("tables"));
+        assertThat(metricSource.name(), is("tables." + 
qualifiedTableName.toCanonicalForm()));
+    }
+
+    @Test
+    void testMetricNames() {
+        var metricSource = new 
TableMetricSource(QualifiedName.fromSimple(TABLE_NAME));
+
+        MetricSet set = metricSource.enable();
+
+        assertThat(set, is(notNullValue()));
+
+        Set<String> expectedMetrics = Set.of(
+                "RwReads",
+                "RoReads",
+                "Writes");
+
+        var actualMetrics = new HashSet<String>();
+        set.forEach(m -> actualMetrics.add(m.name()));
+
+        assertThat(actualMetrics, is(expectedMetrics));
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 6ea44e27a65..e7082817154 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -168,6 +168,7 @@ import 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxStateMeta;
@@ -684,7 +685,8 @@ public class ItTxTestCluster {
                 mock(StreamerReceiverRunner.class),
                 () -> 10_000L,
                 () -> 10_000L,
-                colocationEnabled()
+                colocationEnabled(),
+                new TableMetricSource(QualifiedName.fromSimple(tableName))
         );
 
         TableImpl table = new TableImpl(
@@ -1121,7 +1123,8 @@ public class ItTxTestCluster {
                 mock(IndexMetaStorage.class),
                 lowWatermark,
                 new NoOpFailureManager(),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("test_table"))
         );
     }
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0cf1f02cfc1..9188f9dc976 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -128,6 +128,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
@@ -143,6 +144,7 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -308,7 +310,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(StreamerReceiverRunner.class),
                 () -> 10_000L,
                 () -> 10_000L,
-                colocationEnabled()
+                colocationEnabled(),
+                new TableMetricSource(QualifiedName.fromSimple("test"))
         );
 
         RaftGroupService svc = mock(RaftGroupService.class);
@@ -494,7 +497,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(IndexMetaStorage.class),
                 new TestLowWatermark(),
                 mock(FailureProcessor.class),
-                new SystemPropertiesNodeProperties()
+                new SystemPropertiesNodeProperties(),
+                new TableMetricSource(QualifiedName.fromSimple("dummy_table"))
         );
 
         if (enabledColocation) {

Reply via email to