This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 33fa7b0f3fa IGNITE-26328 Add table metrics (#6544) 33fa7b0f3fa is described below commit 33fa7b0f3fadae266343ef2b39a780fb5686f215 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Wed Sep 10 13:06:18 2025 +0300 IGNITE-26328 Add table metrics (#6544) --- .../ignite/client/fakes/FakeInternalTable.java | 6 + .../rebalance/ItRebalanceDistributedTest.java | 3 +- .../apache/ignite/internal/index/IndexManager.java | 4 +- .../partition/replicator/fixtures/Node.java | 3 +- .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../exec/rel/TableScanNodeExecutionTest.java | 5 +- modules/table/build.gradle | 1 + ...xDistributedTestSingleNodeNoCleanupMessage.java | 5 +- .../ignite/internal/table/ItColocationTest.java | 5 +- .../internal/table/metrics/ItTableMetricsTest.java | 631 +++++++++++++++++++++ .../ignite/internal/table/InternalTable.java | 8 + .../apache/ignite/internal/table/TableImpl.java | 6 + .../ignite/internal/table/TableViewInternal.java | 8 + .../internal/table/distributed/TableManager.java | 45 +- .../replicator/PartitionReplicaListener.java | 161 +++++- .../distributed/storage/InternalTableImpl.java | 12 +- .../internal/table/metrics/TableMetricSource.java | 222 ++++++++ .../distributed/TableManagerRecoveryTest.java | 4 +- .../table/distributed/TableManagerTest.java | 4 +- .../PartitionReplicaListenerIndexLockingTest.java | 5 +- ...itionReplicaListenerSortedIndexLockingTest.java | 5 +- .../replication/PartitionReplicaListenerTest.java | 5 +- .../ZonePartitionReplicaListenerTest.java | 5 +- .../storage/InternalTableEstimatedSizeTest.java | 8 +- .../distributed/storage/InternalTableImplTest.java | 5 +- .../table/metrics/TableMetricSourceTest.java | 65 +++ .../apache/ignite/distributed/ItTxTestCluster.java | 7 +- .../table/impl/DummyInternalTableImpl.java | 8 +- 29 files changed, 1207 insertions(+), 45 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 705788f6979..ce5759970d4 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.StreamerReceiverRunner; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.PendingComparableValuesTracker; @@ -582,4 +583,9 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner payload) .thenApply(resBytes -> new IgniteBiTuple<>(resBytes, FakeCompute.observableTimestamp.longValue())); } + + @Override + public TableMetricSource metrics() { + return null; + } } diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 14c9cfd18be..d761b35bf3a 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1648,7 +1648,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { partitionReplicaLifecycleManager, nodeProperties, minTimeCollectorService, - systemDistributedConfiguration + systemDistributedConfiguration, + metricManager ) { @Override protected TxStateStorage createTxStateTableStorage( diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 718717683f4..c23f00b43c5 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -211,8 +211,8 @@ public class IndexManager implements IgniteComponent { if (LOG.isInfoEnabled()) { LOG.info( - "Creating local index: name={}, id={}, tableId={}, token={}", - index.name(), indexId, tableId, causalityToken + "Creating local index: name={}, id={}, tableId={}, token={}, type={}", + index.name(), indexId, tableId, causalityToken, index.indexType() ); } diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 84122d98061..1a06ed18804 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -803,7 +803,8 @@ public class Node { partitionReplicaLifecycleManager, nodeProperties, minTimeCollectorService, - systemDistributedConfiguration + systemDistributedConfiguration, + metricManager ) { @Override diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 1b17e86b6fd..8e0bded6243 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -811,7 +811,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { partitionReplicaLifecycleListener, nodeProperties, minTimeCollectorService, - systemDistributedConfiguration + systemDistributedConfiguration, + metricManager ); var indexManager = new IndexManager( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 06fb6074e40..4ec1efa464d 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -1148,7 +1148,8 @@ public class IgniteImpl implements Ignite { partitionReplicaLifecycleManager, nodeProperties, minTimeCollectorService, - systemDistributedConfiguration + systemDistributedConfiguration, + metricManager ); disasterRecoveryManager = new DisasterRecoveryManager( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index ff2f9fb7483..3c2b73cc3a2 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.tx.TxManager; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -341,7 +343,8 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple("test")) ); this.dataAmount = dataAmount; diff --git a/modules/table/build.gradle b/modules/table/build.gradle index 8b244242c92..86622fde1c3 100644 --- a/modules/table/build.gradle +++ b/modules/table/build.gradle @@ -86,6 +86,7 @@ dependencies { testImplementation testFixtures(project(':ignite-table')) testImplementation testFixtures(project(':ignite-low-watermark')) testImplementation testFixtures(project(':ignite-failure-handler')) + testImplementation testFixtures(project(':ignite-metrics')) testImplementation libs.jmh.core testImplementation libs.javax.annotations diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index ea00e8d6358..9f49366cc50 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -77,6 +78,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.TransactionException; import org.junit.jupiter.api.BeforeEach; @@ -215,7 +217,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes mock(IndexMetaStorage.class), lowWatermark, mock(FailureProcessor.class), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ) { @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 8bc147b8e68..6cea81e1e53 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -105,6 +105,7 @@ import org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersion import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; @@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.IgniteSql; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.apache.ignite.table.Tuple; import org.jetbrains.annotations.Nullable; @@ -369,7 +371,8 @@ public class ItColocationTest extends BaseIgniteAbstractTest { mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple("TEST")) ); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java new file mode 100644 index 00000000000..2982523a157 --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.metrics; + +import static java.util.List.of; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.AssignmentsTestUtils.awaitAssignmentsStabilizationOnDefaultZone; +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled; +import static org.apache.ignite.internal.table.metrics.TableMetricSource.RO_READS; +import static org.apache.ignite.internal.table.metrics.TableMetricSource.RW_READS; +import static org.apache.ignite.internal.table.metrics.TableMetricSource.WRITES; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.metrics.LongMetric; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.metrics.MetricSet; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.QualifiedName; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Tests key-value and record view operations metrics. + */ +public class ItTableMetricsTest extends ClusterPerClassIntegrationTest { + private static final String TABLE_NAME = "test_table_name".toUpperCase(); + + private static final String SORTED_IDX = "SORTED_IDX"; + private static final String HASH_IDX = "HASH_IDX"; + + private static final String METRIC_SOURCE_NAME = TableMetricSource.SOURCE_NAME + '.' + + QualifiedName.fromSimple(TABLE_NAME).toCanonicalForm(); + + @Override + protected int initialNodes() { + return 2; + } + + @BeforeAll + void createTable() throws Exception { + sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val VARCHAR)"); + + sql("CREATE INDEX IF NOT EXISTS " + SORTED_IDX + " ON PUBLIC." + TABLE_NAME + " USING SORTED (id)"); + sql("CREATE INDEX IF NOT EXISTS " + HASH_IDX + " ON PUBLIC." + TABLE_NAME + " USING HASH (val)"); + + if (colocationEnabled()) { + awaitAssignmentsStabilizationOnDefaultZone(CLUSTER.aliveNode()); + } + } + + /** + * Returns a key value view for the table {@link #TABLE_NAME}. + * + * @param nodeIndex Node index to create a key value view. + * @return Key value view. + */ + private static KeyValueView<Integer, String> keyValueView(int nodeIndex) { + return CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).keyValueView(Integer.class, String.class); + } + + /** + * Returns a record view for the table {@link #TABLE_NAME}. + * + * @param nodeIndex Node index to create a key value view. + * @return Record view. + */ + private static RecordView<Tuple> recordView(int nodeIndex) { + return CLUSTER.node(nodeIndex).tables().table(TABLE_NAME).recordView(); + } + + @Test + void get() { + // Implicit read-only transaction. + testKeyValueViewOperation(RO_READS, 1, view -> view.get(null, 12)); + + // Explicit read-write transaction. + testKeyValueViewOperation(RW_READS, 1, view -> { + Transaction tx = node(0).transactions().begin(); + + view.get(tx, 12); + + tx.commit(); + }); + } + + @Test + void getAll() { + List<Integer> keys = of(12, 15, 17, 19, 23); + + // Implicit getAll operation starts a read-write transaction when all keys are not mapped to the same partition. + testKeyValueViewOperation(RW_READS, keys.size(), view -> view.getAll(null, keys)); + + // Single key getAll operation starts a read-only transaction. + List<Integer> roKeys = of(12); + testKeyValueViewOperation(RO_READS, 1, view -> view.getAll(null, roKeys)); + + List<Integer> nonUniqueKeys = of(12, 15, 12); + testKeyValueViewOperation(RW_READS, nonUniqueKeys.size(), view -> view.getAll(null, nonUniqueKeys)); + } + + @Test + void getOrDefault() { + KeyValueView<Integer, String> kvView = keyValueView(0); + + Integer existingKey = 12; + Integer nonExistingKey = -1; + + kvView.put(null, existingKey, "value_12"); + kvView.remove(null, nonExistingKey); + + testKeyValueViewOperation(RO_READS, 2, view -> { + view.getOrDefault(null, existingKey, "default"); + view.getOrDefault(null, nonExistingKey, "default"); + }); + + testKeyValueViewOperation(RW_READS, 2, view -> { + Transaction tx = node(0).transactions().begin(); + + view.getOrDefault(tx, existingKey, "default"); + view.getOrDefault(tx, nonExistingKey, "default"); + + tx.commit(); + }); + } + + @Test + void contains() { + testKeyValueViewOperation(RO_READS, 1, view -> view.contains(null, 12)); + + testKeyValueViewOperation(RW_READS, 1, view -> { + Transaction tx = node(0).transactions().begin(); + + view.contains(tx, 12); + + tx.commit(); + }); + } + + @Test + void containsAll() { + List<Integer> keys = of(12, 15, 17, 19, 23); + + // Implicit containsAll operation starts a read-write transaction when all keys are not mapped to the same partition. + testKeyValueViewOperation(RW_READS, keys.size(), view -> view.containsAll(null, keys)); + + // Single key. + List<Integer> roKeys = of(12); + testKeyValueViewOperation(RO_READS, 1, view -> view.containsAll(null, roKeys)); + } + + @Test + void put() { + testKeyValueViewOperation(WRITES, 1, view -> view.put(null, 42, "value_42")); + } + + @Test + void putAll() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + testKeyValueViewOperation(WRITES, values.size(), view -> view.putAll(null, values)); + } + + @Test + void getAndPut() { + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.getAndPut(null, 12, "value")); + } + + @Test + void remove() { + Integer key = 12; + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value_42"); + + // Remove existing key. + testKeyValueViewOperation(WRITES, 1, view -> view.remove(null, key)); + + // Remove non existing key. + testKeyValueViewOperation(WRITES, 0, view -> view.remove(null, key)); + } + + @Test + void exactRemove() { + Integer key = 12; + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value_42"); + + // Remove existing key and non-matching value. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.remove(null, key, "wrong-value")); + + // Remove existing key and matching value. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.remove(null, key, "value_42")); + + // Remove non existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.remove(null, key, "value_42")); + } + + @Test + void removeAll() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.removeAll(null); + kvView.putAll(null, values); + + testKeyValueViewOperation(WRITES, values.size(), view -> view.removeAll(null)); + } + + @Test + void removeCollectionKeys() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.putAll(null, values); + + // Remove existing keys. + testKeyValueViewOperation(WRITES, values.size(), view -> view.removeAll(null, values.keySet())); + + // Remove non-existing keys. + testKeyValueViewOperation(WRITES, 0, view -> view.removeAll(null, values.keySet())); + + kvView.putAll(null, values); + + // Remove non-unique keys. + List<Integer> nonUniqueKeys = of(12, 15, 12, 17, 19, 23); + testKeyValueViewOperation(WRITES, nonUniqueKeys.size() - 1, view -> view.removeAll(null, nonUniqueKeys)); + } + + @Test + void putIfAbsent() { + Integer key = 12; + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.remove(null, key); + + // Insert absent key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.putIfAbsent(null, key, "value")); + + // Insert existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.putIfAbsent(null, key, "value-42")); + } + + @Test + void getAndRemove() { + Integer key = 12; + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value_42"); + + // Remove existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.getAndRemove(null, key)); + + // Remove non-existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.getAndRemove(null, key)); + } + + @Test + void replace() { + Integer key = 12; + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value"); + + // Replace existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.replace(null, key, "replaced")); + + kvView.remove(null, key); + + // Replace non-existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.replace(null, key, "value")); + } + + @Test + void conditionalReplace() { + Integer key = 12; + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value"); + + // Replace existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.replace(null, key, "wrong", "replaced")); + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.replace(null, key, "value", "replaced")); + + kvView.remove(null, key); + + // Replace non-existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.replace(null, key, "replaced", "value")); + } + + @Test + void getAndReplace() { + Integer key = 12; + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.put(null, key, "value"); + + // Replace existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 1L), view -> view.getAndReplace(null, key, "replaced")); + + kvView.remove(null, key); + + // Replace non-existing key. + testKeyValueViewOperation(of(RW_READS, WRITES), of(1L, 0L), view -> view.replace(null, key, "replaced")); + } + + @Test + void insertAll() { + List<Tuple> keys = of(Tuple.create().set("id", 12), Tuple.create().set("id", 42)); + List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))).collect(toList()); + + recordView(0).deleteAll(null, keys); + + // Insert non-existing keys. + testRecordViewOperation(of(WRITES, RW_READS), of((long) recs.size(), (long) recs.size()), view -> view.insertAll(null, recs)); + + // Insert existing keys. + testRecordViewOperation(of(WRITES, RW_READS), of(0L, (long) recs.size()), view -> view.insertAll(null, recs)); + + recordView(0).delete(null, keys.get(0)); + + // Insert one non-existing key. + testRecordViewOperation(of(WRITES, RW_READS), of(1L, (long) recs.size()), view -> view.insertAll(null, recs)); + + // Insert non-unique keys. + List<Tuple> nonUniqueKeys = of( + Tuple.create().set("id", 12), + Tuple.create().set("id", 42), + Tuple.create().set("id", 12)); + List<Tuple> nonUniqueValues = nonUniqueKeys + .stream() + .map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))) + .collect(toList()); + + recordView(0).deleteAll(null, keys); + + testRecordViewOperation( + of(WRITES, RW_READS), + of((long) nonUniqueKeys.size() - 1, (long) nonUniqueKeys.size()), + view -> view.insertAll(null, nonUniqueValues)); + } + + @Test + void deleteAll() { + List<Tuple> keys = of(Tuple.create().set("id", 12), Tuple.create().set("id", 42)); + List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))).collect(toList()); + + recordView(0).upsertAll(null, recs); + + // Delete existing keys. + testRecordViewOperation(WRITES, recs.size(), view -> view.deleteAll(null, keys)); + + // Delete non-existing keys. + testRecordViewOperation(WRITES, 0L, view -> view.deleteAll(null, keys)); + + recordView(0).insert(null, recs.get(0)); + + // Delete one non-existing key. + testRecordViewOperation(WRITES, 1L, view -> view.deleteAll(null, keys)); + + // Non-unique keys. + List<Tuple> nonUniqueKeys = of( + Tuple.create().set("id", 12), + Tuple.create().set("id", 42), + Tuple.create().set("id", 12)); + List<Tuple> nonUniqueRecs = nonUniqueKeys + .stream() + .map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))) + .collect(toList()); + + recordView(0).upsertAll(null, nonUniqueRecs); + + testRecordViewOperation(WRITES, 2L, view -> view.deleteAll(null, nonUniqueKeys)); + } + + @Test + void deleteAllExact() { + List<Tuple> keys = of(Tuple.create().set("id", 12), Tuple.create().set("id", 42)); + List<Tuple> recs = keys.stream().map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))).collect(toList()); + + recordView(0).upsertAll(null, recs); + + // Delete existing keys. + testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), (long) recs.size()), view -> view.deleteAllExact(null, recs)); + + // Delete non-existing keys. + testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 0L), view -> view.deleteAllExact(null, recs)); + + recordView(0).insert(null, recs.get(0)); + + // Delete one non-existing key. + testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 1L), view -> view.deleteAllExact(null, recs)); + + recordView(0).upsertAll(null, recs); + List<Tuple> nonExact = keys.stream().map(t -> Tuple.copy(t).set("val", "value_xyz_" + t.intValue("id"))).collect(toList()); + + testRecordViewOperation(of(RW_READS, WRITES), of((long) recs.size(), 0L), view -> view.deleteAllExact(null, nonExact)); + + // Non-unique keys. + List<Tuple> nonUniqueKeys = of( + Tuple.create().set("id", 12), + Tuple.create().set("id", 42), + Tuple.create().set("id", 12)); + List<Tuple> nonUniqueRecs = nonUniqueKeys + .stream() + .map(t -> Tuple.copy(t).set("val", "value_" + t.intValue("id"))) + .collect(toList()); + + recordView(0).upsertAll(null, nonUniqueRecs); + + testRecordViewOperation( + of(RW_READS, WRITES), + of((long) nonUniqueRecs.size(), 2L), + view -> view.deleteAllExact(null, nonUniqueRecs)); + } + + @Test + void scan() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.removeAll(null); + kvView.putAll(null, values); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, (long) values.size()), view -> { + Transaction tx = node(0).transactions().begin(); + + Object[] emptyArgs = new Object[0]; + sql(0, tx, "select * from " + TABLE_NAME, emptyArgs); + + tx.commit(); + }); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of((long) values.size(), 0L), view -> { + Object[] emptyArgs = new Object[0]; + sql(0, null, "select * from " + TABLE_NAME, emptyArgs); + }); + } + + @Test + void sortedIndexScan() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.removeAll(null); + kvView.putAll(null, values); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of(2L, 0L), view -> { + Object[] emptyArgs = new Object[0]; + sql(0, null, "select /*+ force_index (" + SORTED_IDX + ") */ * from " + TABLE_NAME + " where id > 15 and id < 20", emptyArgs); + }); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 2L), view -> { + Transaction tx = node(0).transactions().begin(); + + Object[] emptyArgs = new Object[0]; + sql(0, tx, "select /*+ force_index (" + SORTED_IDX + ") */ * from " + TABLE_NAME + " where id > 15 and id < 20", emptyArgs); + + tx.commit(); + }); + } + + @Test + void hashIndexScan() { + Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19, "19", 23, "23"); + + KeyValueView<Integer, String> kvView = keyValueView(0); + kvView.removeAll(null); + kvView.putAll(null, values); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of(1L, 0L), view -> { + Object[] emptyArgs = new Object[0]; + sql(0, null, "select /*+ force_index (" + HASH_IDX + ") */ * from " + TABLE_NAME + " where val = '19'", emptyArgs); + }); + + testKeyValueViewOperation(of(RO_READS, RW_READS), of(0L, 1L), view -> { + Transaction tx = node(0).transactions().begin(); + + Object[] emptyArgs = new Object[0]; + sql(0, tx, "select /*+ force_index (" + HASH_IDX + ") */ * from " + TABLE_NAME + " where val = '19'", emptyArgs); + + tx.commit(); + }); + } + + /** + * Tests that the given operation increases the specified metric by the expected value. + * + * @param metricName Metric name to be checked. + * @param expectedValue Expected value to increase the metric. + * @param op Operation to be executed. + */ + private void testKeyValueViewOperation(String metricName, long expectedValue, Consumer<KeyValueView<Integer, String>> op) { + testKeyValueViewOperation(of(metricName), of(expectedValue), op); + } + + /** + * Tests that the given operation increases the specified metrics by the expected values. + * + * @param metricNames Metric names to be checked. + * @param expectedValues Expected values to increase the metrics. + * @param op Operation to be executed. + */ + private void testKeyValueViewOperation( + List<String> metricNames, + List<Long> expectedValues, + Consumer<KeyValueView<Integer, String>> op + ) { + testOperation(metricNames, expectedValues, () -> op.accept(keyValueView(0))); + } + + /** + * Tests that the given operation increases the specified metric by the expected value. + * + * @param metricName Metric name to be checked. + * @param expectedValue Expected value to increase the metric. + * @param op Operation to be executed. + */ + private void testRecordViewOperation(String metricName, long expectedValue, Consumer<RecordView<Tuple>> op) { + testRecordViewOperation(of(metricName), of(expectedValue), op); + } + + /** + * Tests that the given operation increases the specified metrics by the expected values. + * + * @param metricNames Metric names to be checked. + * @param expectedValues Expected values to increase the metrics. + * @param op Operation to be executed. + */ + private void testRecordViewOperation( + List<String> metricNames, + List<Long> expectedValues, + Consumer<RecordView<Tuple>> op + ) { + testOperation(metricNames, expectedValues, () -> op.accept(recordView(0))); + } + + /** + * Tests that the given operation increases the specified metrics by the expected values. + * + * @param metricNames Metric names to be checked. + * @param expectedValues Expected values to increase the metrics. + * @param op Operation to be executed. + */ + private void testOperation( + List<String> metricNames, + List<Long> expectedValues, + Runnable op + ) { + assertThat(metricNames.size(), is(expectedValues.size())); + + Map<String, Long> initialValues = metricValues(metricNames); + + op.run(); + + Map<String, Long> actualValues = metricValues(metricNames); + + for (int i = 0; i < metricNames.size(); ++i) { + String metricName = metricNames.get(i); + long expectedValue = expectedValues.get(i); + + long initialValue = initialValues.get(metricName); + long actualValue = actualValues.get(metricName); + + assertThat( + "The actual metric value does not match the expected value " + + "[metric=" + metricName + ", initial=" + initialValue + ", actual=" + actualValue + + ", expected=" + (initialValue + expectedValue) + ']', + actualValue, + is(initialValue + expectedValue)); + } + } + + /** + * Returns the sum of the specified metrics on all nodes. + * + * @param metricNames Metric names. + * @return Map of metric names to their values. + */ + private Map<String, Long> metricValues(List<String> metricNames) { + Map<String, Long> values = new HashMap<>(metricNames.size()); + + for (int i = 0; i < initialNodes(); ++i) { + MetricSet tableMetrics = unwrapIgniteImpl(node(i)) + .metricManager() + .metricSnapshot() + .metrics() + .get(METRIC_SOURCE_NAME); + + metricNames.forEach(metricName -> + values.compute(metricName, (k, v) -> { + Metric metric = tableMetrics.get(metricName); + + assertThat("Metric not found [name=" + metricName + ']', metric, is(notNullValue())); + assertThat( + "Metric is not a LongMetric [name=" + metricName + ", class=" + metric.getClass().getSimpleName() + ']', + metric, + instanceOf(LongMetric.class)); + + return (v == null ? 0 : v) + ((LongMetric) metric).value(); + })); + } + + return values; + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 5cefe7547f9..44bdbadf5c9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.PendingComparableValuesTracker; @@ -552,4 +553,11 @@ public interface InternalTable extends ManuallyCloseable { * @return The id. */ ReplicationGroupId targetReplicationGroupId(int partId); + + /** + * Returns a metric source for this table. + * + * @return Table metrics source. + */ + TableMetricSource metrics(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index 856142d3efd..8e9908b84cf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.schema.SchemaVersions; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.sql.IgniteSql; @@ -310,4 +311,9 @@ public class TableImpl implements TableViewInternal { } }); } + + @Override + public TableMetricSource metrics() { + return tbl.metrics(); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java index 2a40ba63149..3dac8dd01d2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; import org.apache.ignite.internal.table.distributed.IndexLocker; import org.apache.ignite.internal.table.distributed.PartitionSet; import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; @@ -127,4 +128,11 @@ public interface TableViewInternal extends Table { * @param indexId An index id to unregister. */ void unregisterIndex(int indexId); + + /** + * Returns a metric source for this table. + * + * @return Table metrics source. + */ + TableMetricSource metrics(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 23cd96c1049..3126ff450ba 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -147,6 +147,7 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.Revisions; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.TopologyService; @@ -232,6 +233,7 @@ import org.apache.ignite.internal.table.distributed.storage.BrokenTxStateStorage import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine; import org.apache.ignite.internal.table.distributed.storage.PartitionStorages; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -464,6 +466,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final TableAssignmentsService assignmentsService; private final ReliableCatalogVersions reliableCatalogVersions; + private final MetricManager metricManager; + /** * Creates a new table manager. * @@ -494,6 +498,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param partitionReplicaLifecycleManager Partition replica lifecycle manager. * @param minTimeCollectorService Collects minimum required timestamp for each partition. * @param systemDistributedConfiguration System distributed configuration. + * @param metricManager Metric manager. */ public TableManager( String nodeName, @@ -533,7 +538,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PartitionReplicaLifecycleManager partitionReplicaLifecycleManager, NodeProperties nodeProperties, MinimumRequiredTimeCollectorService minTimeCollectorService, - SystemDistributedConfiguration systemDistributedConfiguration + SystemDistributedConfiguration systemDistributedConfiguration, + MetricManager metricManager ) { this.topologyService = topologyService; this.replicaMgr = replicaMgr; @@ -563,6 +569,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.partitionReplicaLifecycleManager = partitionReplicaLifecycleManager; this.nodeProperties = nodeProperties; this.minTimeCollectorService = minTimeCollectorService; + this.metricManager = metricManager; this.executorInclinedSchemaSyncService = new ExecutorInclinedSchemaSyncService(schemaSyncService, partitionOperationsExecutor); this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor); @@ -1171,6 +1178,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private void onTableDrop(DropTableEventParameters parameters) { inBusyLock(busyLock, () -> { + unregisterMetricsSource(parameters.tableId()); + destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId())); }); } @@ -1533,7 +1542,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { indexMetaStorage, lowWatermark, failureProcessor, - nodeProperties + nodeProperties, + table.metrics() ); } @@ -1804,7 +1814,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { Objects.requireNonNull(streamerReceiverRunner), () -> txCfg.value().readWriteTimeoutMillis(), () -> txCfg.value().readOnlyTimeoutMillis(), - nodeProperties.colocationEnabled() + nodeProperties.colocationEnabled(), + createAndRegisterMetricsSource(tableName) ); return new TableImpl( @@ -3218,6 +3229,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int tableId = tableDescriptor.id(); if (nextCatalog != null && nextCatalog.table(tableId) == null) { + unregisterMetricsSource(tableId); + destructionEventsQueue.enqueue(new DestroyTableEvent(nextCatalog.version(), tableId)); } @@ -3546,6 +3559,32 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } } + private TableMetricSource createAndRegisterMetricsSource(QualifiedName tableName) { + TableMetricSource source = new TableMetricSource(tableName); + + try { + metricManager.registerSource(source); + metricManager.enable(source); + } catch (Exception e) { + LOG.warn("Failed to register metrics source for table [name={}].", e, source.qualifiedTableName()); + } + + return source; + } + + private void unregisterMetricsSource(int tableId) { + try { + TableViewInternal table = startedTables.get(tableId); + if (table == null) { + return; + } + + metricManager.unregisterSource(table.metrics()); + } catch (Exception e) { + LOG.warn("Failed to unregister metrics source for table [tableId={}].", e, tableId); + } + } + private static class TableClosedException extends IgniteInternalException { private static final long serialVersionUID = 1L; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index a58ee9bb9fb..669a2a58ccc 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -196,6 +196,7 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage import org.apache.ignite.internal.table.distributed.TableUtils; import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; import org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.LockKey; import org.apache.ignite.internal.tx.LockManager; @@ -346,6 +347,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr private static final boolean SKIP_UPDATES = getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK); + private final TableMetricSource metrics; + private final ReplicaPrimacyEngine replicaPrimacyEngine; private final TableAwareReplicaRequestPreProcessor tableAwareReplicaRequestPreProcessor; private final ReliableCatalogVersions reliableCatalogVersions; @@ -384,6 +387,7 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr * @param clusterNodeResolver Node resolver. * @param remotelyTriggeredResourceRegistry Resource registry. * @param indexMetaStorage Index meta storage. + * @param metrics Table metric source. */ public PartitionReplicaListener( MvPartitionStorage mvDataStorage, @@ -412,7 +416,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr IndexMetaStorage indexMetaStorage, LowWatermark lowWatermark, FailureProcessor failureProcessor, - NodeProperties nodeProperties + NodeProperties nodeProperties, + TableMetricSource metrics ) { this.mvDataStorage = mvDataStorage; this.raftCommandRunner = raftCommandRunner; @@ -435,6 +440,7 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr this.replicationGroupId = replicationGroupId; this.tableId = tableId; this.tableLockKey = new TablePartitionId(tableId, replicationGroupId.partitionId()); + this.metrics = metrics; this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService); @@ -771,7 +777,11 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr return completedFuture(rows); } else { return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId()) - .thenApply(ignored -> rows); + .thenApply(ignored -> { + metrics.onRead(rows.size(), false); + + return rows; + }); } }) .whenComplete((rows, err) -> { @@ -840,7 +850,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId()); - CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture() + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) + ? nullCompletedFuture() : safeTime.waitFor(readTimestamp); if (request.indexToUse() != null) { @@ -853,18 +864,34 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr if (request.exactKey() != null) { assert request.lowerBoundPrefix() == null && request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds."; - return safeReadFuture.thenCompose(unused -> lookupIndex(request, indexStorage)); + return safeReadFuture + .thenCompose(unused -> lookupIndex(request, indexStorage)) + .thenApply(rows -> { + metrics.onRead(rows.size(), true); + + return rows; + }); } assert indexStorage.storage() instanceof SortedIndexStorage; - return safeReadFuture.thenCompose(unused -> scanSortedIndex(request, indexStorage)); + return safeReadFuture + .thenCompose(unused -> scanSortedIndex(request, indexStorage)) + .thenApply(rows -> { + metrics.onRead(rows.size(), true); + + return rows; + }); } return safeReadFuture .thenCompose( - unused -> retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), readTimestamp, cursorId, batchCount) - ); + unused -> retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), readTimestamp, cursorId, batchCount)) + .thenApply(rows -> { + metrics.onRead(rows.size(), true); + + return rows; + }); } /** @@ -1054,11 +1081,13 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr HybridTimestamp readTimestamp = request.readTimestamp(); if (request.requestType() != RO_GET) { - throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + throw new IgniteInternalException( + Replicator.REPLICA_COMMON_ERR, format("Unknown single request [actionType={}]", request.requestType())); } - CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture() + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) + ? nullCompletedFuture() : safeTime.waitFor(request.readTimestamp()); return safeReadFuture.thenCompose(unused -> resolveRowByPkForReadOnly(primaryKey, readTimestamp)); @@ -1090,11 +1119,13 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr HybridTimestamp readTimestamp = request.readTimestamp(); if (request.requestType() != RO_GET_ALL) { - throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + throw new IgniteInternalException( + Replicator.REPLICA_COMMON_ERR, format("Unknown single request [actionType={}]", request.requestType())); } - CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture() + CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) + ? nullCompletedFuture() : safeTime.waitFor(request.readTimestamp()); return safeReadFuture.thenCompose(unused -> { @@ -1931,10 +1962,14 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr // Nothing found in the storage, return null. if (writeIntents.isEmpty() && regularEntries.isEmpty()) { + metrics.onRead(true); + return nullCompletedFuture(); } if (writeIntents.isEmpty()) { + metrics.onRead(true); + // No write intents, then return the committed value. We already know that regularEntries is not empty. return completedFuture(regularEntries.get(0).binaryRow()); } else { @@ -1948,6 +1983,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr resolveWriteIntentReadability(writeIntent, ts) .thenApply(writeIntentReadable -> inBusyLock(busyLock, () -> { + metrics.onRead(true); + if (writeIntentReadable) { return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null); } else { @@ -2104,6 +2141,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (rowIdsToDelete.isEmpty()) { + metrics.onRead(searchRows.size(), false); + return completedFuture(new ReplicaResult(result, null)); } @@ -2117,7 +2156,12 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr leaseStartTime ) ) - .thenApply(res -> new ReplicaResult(result, res)); + .thenApply(res -> { + metrics.onRead(searchRows.size(), false); + metrics.onWrite(rowIdsToDelete.size()); + + return new ReplicaResult(result, res); + }); }); } case RW_INSERT_ALL: { @@ -2153,6 +2197,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (rowsToInsert.isEmpty()) { + metrics.onRead(searchRows.size(), false); + return completedFuture(new ReplicaResult(result, null)); } @@ -2185,6 +2231,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) ) .thenApply(res -> { + metrics.onRead(searchRows.size(), false); + metrics.onWrite(rowsToInsert.size()); + // Release short term locks. for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> insertLockFut : insertLockFuts) { insertLockFut.join().get2() @@ -2222,11 +2271,14 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } } + int uniqueKeysCount = 0; for (int i = 0; i < searchRows.size(); i++) { if (rowIdFuts[i] != null) { continue; // Skip previous row with the same key. } + uniqueKeysCount++; + BinaryRow searchRow = searchRows.get(i); boolean isDelete = deleted != null && deleted.get(i); @@ -2256,6 +2308,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr }); } + int uniqueKeysCountFinal = uniqueKeysCount; + return allOf(rowIdFuts).thenCompose(ignore -> { Map<UUID, TimedBinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(searchRows.size()); List<RowId> rows = new ArrayList<>(); @@ -2282,6 +2336,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (rowsToUpdate.isEmpty()) { + metrics.onRead(uniqueKeysCountFinal, false); + return completedFuture(new ReplicaResult(null, null)); } @@ -2296,6 +2352,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) ) .thenApply(res -> { + metrics.onRead(uniqueKeysCountFinal, false); + metrics.onWrite(uniqueKeysCountFinal); + // Release short term locks. for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> rowIdFut : rowIdFuts) { IgniteBiTuple<RowId, Collection<Lock>> futRes = rowIdFut.join(); @@ -2356,11 +2415,17 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (allElementsAreNull(result)) { + metrics.onRead(result.size(), false); + return completedFuture(new ReplicaResult(result, null)); } return validateRwReadAgainstSchemaAfterTakingLocks(txId) - .thenApply(unused -> new ReplicaResult(result, null)); + .thenApply(unused -> { + metrics.onRead(result.size(), false); + + return new ReplicaResult(result, null); + }); }); } case RW_DELETE_ALL: { @@ -2422,7 +2487,11 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr leaseStartTime ) ) - .thenApply(res -> new ReplicaResult(result, res)); + .thenApply(res -> { + metrics.onWrite(rowIdsToDelete.size()); + + return new ReplicaResult(result, res); + }); }); } default: { @@ -2732,7 +2801,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr HybridTimestamp readTimestamp = opStartTimestamp; if (request.requestType() != RO_GET) { - throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + throw new IgniteInternalException( + Replicator.REPLICA_COMMON_ERR, format("Unknown single request [actionType={}]", request.requestType())); } @@ -2757,12 +2827,16 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr case RW_DELETE_EXACT: { return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } return takeLocksForDeleteExact(searchRow, rowId, row, txId) .thenCompose(validatedRowId -> { if (validatedRowId == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } @@ -2778,13 +2852,20 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr leaseStartTime ) ) - .thenApply(res -> new ReplicaResult(true, res)); + .thenApply(res -> { + metrics.onRead(false); + metrics.onWrite(); + + return new ReplicaResult(true, res); + }); }); }); } case RW_INSERT: { return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId != null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } @@ -2804,6 +2885,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) .thenApply(tuple -> { + metrics.onRead(false); + metrics.onWrite(); + // Release short term locks. tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); @@ -2836,6 +2920,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) .thenApply(tuple -> { + metrics.onWrite(); + // Release short term locks. tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); @@ -2868,6 +2954,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) .thenApply(tuple -> { + metrics.onRead(false); + metrics.onWrite(); + // Release short term locks. tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); @@ -2878,6 +2967,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr case RW_GET_AND_REPLACE: { return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(null, null)); } @@ -2896,6 +2987,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) .thenApply(tuple -> { + metrics.onRead(false); + metrics.onWrite(); + // Release short term locks. tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); @@ -2906,6 +3000,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr case RW_REPLACE_IF_EXIST: { return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } @@ -2924,6 +3020,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) .thenApply(tuple -> { + metrics.onRead(false); + metrics.onWrite(); + // Release short term locks. tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); @@ -2957,12 +3056,18 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr case RW_GET: { return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return nullCompletedFuture(); } return takeLocksForGet(rowId, txId) .thenCompose(ignored -> validateRwReadAgainstSchemaAfterTakingLocks(txId)) - .thenApply(ignored -> new ReplicaResult(row, null)); + .thenApply(ignored -> { + metrics.onRead(false); + + return new ReplicaResult(row, null); + }); }); } case RW_DELETE: { @@ -2988,12 +3093,18 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr leaseStartTime ) ) - .thenApply(res -> new ReplicaResult(true, res)); + .thenApply(res -> { + metrics.onWrite(); + + return new ReplicaResult(true, res); + }); }); } case RW_GET_AND_DELETE: { return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return nullCompletedFuture(); } @@ -3014,7 +3125,12 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr leaseStartTime ) ) - .thenApply(res -> new ReplicaResult(row, res)); + .thenApply(res -> { + metrics.onRead(false); + metrics.onWrite(); + + return new ReplicaResult(row, res); + }); }); } default: { @@ -3229,12 +3345,16 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr if (request.requestType() == RW_REPLACE) { return resolveRowByPk(extractPk(newRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } return takeLocksForReplace(expectedRow, row, newRow, rowId, txId) .thenCompose(rowIdLock -> { if (rowIdLock == null) { + metrics.onRead(false); + return completedFuture(new ReplicaResult(false, null)); } @@ -3256,6 +3376,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr ) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)) .thenApply(tuple -> { + metrics.onRead(false); + metrics.onWrite(); + // Release short term locks. tuple.get2().get2() .forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode())); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 9fa5cb52364..8f418d59651 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -119,6 +119,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.TransactionIds; @@ -211,6 +212,8 @@ public class InternalTableImpl implements InternalTable { private final boolean colocationEnabled; + private final TableMetricSource metrics; + /** * Constructor. * @@ -249,7 +252,8 @@ public class InternalTableImpl implements InternalTable { StreamerReceiverRunner streamerReceiverRunner, Supplier<Long> defaultRwTxTimeout, Supplier<Long> defaultReadTxTimeout, - boolean colocationEnabled + boolean colocationEnabled, + TableMetricSource metrics ) { this.tableName = tableName; this.zoneId = zoneId; @@ -269,6 +273,7 @@ public class InternalTableImpl implements InternalTable { this.defaultRwTxTimeout = defaultRwTxTimeout; this.defaultReadTxTimeout = defaultReadTxTimeout; this.colocationEnabled = colocationEnabled; + this.metrics = metrics; } /** {@inheritDoc} */ @@ -2404,4 +2409,9 @@ public class InternalTableImpl implements InternalTable { Boolean full ); } + + @Override + public TableMetricSource metrics() { + return metrics; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java new file mode 100644 index 00000000000..0348a4f9b79 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.metrics; + +import java.util.List; +import org.apache.ignite.internal.metrics.AbstractMetricSource; +import org.apache.ignite.internal.metrics.LongAdderMetric; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.table.metrics.TableMetricSource.Holder; +import org.apache.ignite.table.QualifiedName; + +/** + * Set of metrics related to a specific table. + * + * <p> + * <b>Metrics affected by key-value and record view operations:</b> + * <table border="1"> + * <caption>Methods and affected metrics</caption> + * <tr> + * <th>Method(s)</th> + * <th>RoReads</th> + * <th>RwReads</th> + * <th>Writes</th> + * </tr> + * <tr> + * <td>get, getOrDefault, contains</td> + * <td>Yes, for read-only transactions</td> + * <td>Yes, for read-write transactions</td> + * <td>No</td> + * </tr> + * <tr> + * <td>getAll, containsAll</td> + * <td>Yes, it is incremented by the number of keys read for read-only transactions</td> + * <td>Yes, it is incremented by the number of keys read for read-write transactions</td> + * <td>No</td> + * </tr> + * <tr> + * <td>put, upsert</td> + * <td>No</td> + * <td>No</td> + * <td>Yes</td> + * </tr> + * <tr> + * <td>putAll, upsertAll</td> + * <td>No</td> + * <td>No</td> + * <td>Yes, it is incremented by the number of keys inserted</td> + * </tr> + * <tr> + * <td>putIfAbsent, insert</td> + * <td>No</td> + * <td>Yes</td> + * <td>Yes, if the method returns true</td> + * </tr> + * <tr> + * <td>insertAll</td> + * <td>No</td> + * <td>Yes, it is incremented by the number of keys read, which is equal to the number of keys provided</td> + * <td>Yes, it is incremented by the number of keys inserted</td> + * </tr> + * <tr> + * <td>getAndPut, replace, getAndReplace</td> + * <td>No</td> + * <td>Yes</td> + * <td>Yes, if the value is inserted / replaced</td> + * </tr> + * <tr> + * <td>remove, delete</td> + * <td>No</td> + * <td>No</td> + * <td>Yes, if the method returns true</td> + * </tr> + * <tr> + * <td>removeAll, getAndRemove, deleteAll</td> + * <td>No</td> + * <td>No</td> + * <td>Yes, it is incremented by the number of keys removed</td> + * </tr> + * <tr> + * <td>conditional remove, deleteExact, deleteAllExact</td> + * <td>No</td> + * <td>Yes, it is incremented by the number of keys read, which is equal to the number of keys provided</td> + * <td>Yes, it is incremented by the number of keys removed</td> + * </tr> + * <tr> + * <td>getAndRemove</td> + * <td>No</td> + * <td>Yes</td> + * <td>Yes, if the value is removed</td> + * </tr> + * </table> + * + * <i>Note: Only synchronous methods are listed. Asynchronous methods affect the same metrics.</i> + */ +public class TableMetricSource extends AbstractMetricSource<Holder> { + /** Source name. */ + public static final String SOURCE_NAME = "tables"; + + /** Metric names. */ + public static final String RO_READS = "RoReads"; + public static final String RW_READS = "RwReads"; + public static final String WRITES = "Writes"; + + private final QualifiedName tableName; + + /** + * Creates a new instance of {@link TableMetricSource}. + * + * @param tableName Qualified table name. + */ + public TableMetricSource(QualifiedName tableName) { + super(SOURCE_NAME + '.' + tableName.toCanonicalForm(), "Table metrics.", "tables"); + this.tableName = tableName; + } + + /** + * Returns the qualified name of the table. + * + * @return Qualified name of the table. + */ + public QualifiedName qualifiedTableName() { + return tableName; + } + + /** + * Increments a counter of reads. + * + * @param readOnly {@code true} if read operation is executed within read-only transaction, and {@code false} otherwise. + */ + public void onRead(boolean readOnly) { + Holder holder = holder(); + + if (holder != null) { + if (readOnly) { + holder.roReads.increment(); + } else { + holder.rwReads.increment(); + } + } + } + + /** + * Adds the given {@code x} to a counter of reads. + * + * @param readOnly {@code true} if read operation is executed within read-only transaction, and {@code false} otherwise. + */ + public void onRead(int x, boolean readOnly) { + Holder holder = holder(); + + if (holder != null) { + if (readOnly) { + holder.roReads.add(x); + } else { + holder.rwReads.add(x); + } + } + } + + /** + * Increments a counter of writes. + */ + public void onWrite() { + Holder holder = holder(); + + if (holder != null) { + holder.writes.increment(); + } + } + + /** + * Adds the given {@code x} to a counter of writes. + */ + public void onWrite(int x) { + Holder holder = holder(); + + if (holder != null) { + holder.writes.add(x); + } + } + + @Override + protected Holder createHolder() { + return new Holder(); + } + + /** Actual metrics holder. */ + protected static class Holder implements AbstractMetricSource.Holder<Holder> { + private final LongAdderMetric roReads = new LongAdderMetric( + RO_READS, + "The total number of reads executed within read-write transactions."); + + private final LongAdderMetric rwReads = new LongAdderMetric( + RW_READS, + "The total number of reads executed within read-only transactions."); + + private final LongAdderMetric writes = new LongAdderMetric( + WRITES, + "The total number of writes executed within read-write transactions."); + + private final List<Metric> metrics = List.of(roReads, rwReads, writes); + + @Override + public Iterable<Metric> metrics() { + return metrics; + } + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index 64895bf0194..ea9a719d6ab 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; import org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metrics.MetricManager; +import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.InternalClusterNodeImpl; @@ -498,7 +499,8 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { partitionReplicaLifecycleManager, new SystemPropertiesNodeProperties(), minTimeCollectorService, - systemDistributedConfiguration + systemDistributedConfiguration, + new NoOpMetricManager() ) { @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 416cf5aaa70..c02f6a47f49 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -106,6 +106,7 @@ import org.apache.ignite.internal.metastorage.Revisions; import org.apache.ignite.internal.metastorage.impl.MetaStorageRevisionListenerRegistry; import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; import org.apache.ignite.internal.metrics.MetricManager; +import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.InternalClusterNodeImpl; @@ -923,7 +924,8 @@ public class TableManagerTest extends IgniteAbstractTest { partitionReplicaLifecycleManager, new SystemPropertiesNodeProperties(), new MinimumRequiredTimeCollectorServiceImpl(), - systemDistributedConfiguration + systemDistributedConfiguration, + new NoOpMetricManager() ) { @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index 8992f808425..56407da3710 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -114,6 +114,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TransactionStateR import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.LockManager; @@ -130,6 +131,7 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.Pair; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.table.QualifiedName; import org.hamcrest.CustomMatcher; import org.hamcrest.Matcher; import org.junit.jupiter.api.BeforeAll; @@ -291,7 +293,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest mock(IndexMetaStorage.class), new TestLowWatermark(), new NoOpFailureManager(), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java index 1511e38c555..767cec133eb 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java @@ -109,6 +109,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TransactionStateR import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.LockManager; @@ -125,6 +126,7 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.Pair; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.table.QualifiedName; import org.hamcrest.CustomMatcher; import org.hamcrest.Matcher; import org.junit.jupiter.api.BeforeAll; @@ -260,7 +262,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest extends IgniteAbstra mock(IndexMetaStorage.class), new TestLowWatermark(), new NoOpFailureManager(), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 1cb08db2bfe..30cddf6ba4e 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -227,6 +227,7 @@ import org.apache.ignite.internal.table.distributed.replicator.StaleTransactionO import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tostring.IgniteToStringInclude; @@ -263,6 +264,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.sql.ColumnType; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.tx.TransactionException; import org.hamcrest.Matcher; import org.jetbrains.annotations.Nullable; @@ -692,7 +694,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { indexMetaStorage, lowWatermark, new NoOpFailureManager(), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); kvMarshaller = marshallerFor(schemaDescriptor); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java index 6e4596ffd7c..097fe41f2b3 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java @@ -192,6 +192,7 @@ import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tostring.IgniteToStringInclude; @@ -226,6 +227,7 @@ import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.sql.ColumnType; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -661,7 +663,8 @@ public class ZonePartitionReplicaListenerTest extends IgniteAbstractTest { indexMetaStorage, lowWatermark, failureManager, - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); kvMarshaller = marshallerFor(schemaDescriptor); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java index a1482277953..631795540dc 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; @@ -112,6 +113,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -235,7 +237,8 @@ public class InternalTableEstimatedSizeTest extends BaseIgniteAbstractTest { mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME)) ); when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class)); @@ -344,7 +347,8 @@ public class InternalTableEstimatedSizeTest extends BaseIgniteAbstractTest { indexMetaStorage, new TestLowWatermark(), new NoOpFailureManager(), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java index 72620208e3c..082e723189f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.StreamerReceiverRunner; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.SystemPropertiesExtension; import org.apache.ignite.internal.testframework.WithSystemProperty; @@ -99,6 +100,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -247,7 +249,8 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest { mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple("test")) ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java new file mode 100644 index 00000000000..7e942f12da0 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/metrics/TableMetricSourceTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.metrics; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.metrics.MetricSet; +import org.apache.ignite.table.QualifiedName; +import org.junit.jupiter.api.Test; + +/** + * Tests metric source name and table metric names. + * If you want to change the name, or add a new metric, please don't forget to update the corresponding documentation. + */ +public class TableMetricSourceTest { + private static final String TABLE_NAME = "test_table"; + + @Test + void testMetricSourceName() { + QualifiedName qualifiedTableName = QualifiedName.fromSimple(TABLE_NAME); + + var metricSource = new TableMetricSource(qualifiedTableName); + + assertThat(TableMetricSource.SOURCE_NAME, is("tables")); + assertThat(metricSource.name(), is("tables." + qualifiedTableName.toCanonicalForm())); + } + + @Test + void testMetricNames() { + var metricSource = new TableMetricSource(QualifiedName.fromSimple(TABLE_NAME)); + + MetricSet set = metricSource.enable(); + + assertThat(set, is(notNullValue())); + + Set<String> expectedMetrics = Set.of( + "RwReads", + "RoReads", + "Writes"); + + var actualMetrics = new HashSet<String>(); + set.forEach(m -> actualMetrics.add(m.name())); + + assertThat(actualMetrics, is(expectedMetrics)); + } +} diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 6ea44e27a65..e7082817154 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -168,6 +168,7 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxStateMeta; @@ -684,7 +685,8 @@ public class ItTxTestCluster { mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple(tableName)) ); TableImpl table = new TableImpl( @@ -1121,7 +1123,8 @@ public class ItTxTestCluster { mock(IndexMetaStorage.class), lowWatermark, new NoOpFailureManager(), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("test_table")) ); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 0cf1f02cfc1..9188f9dc976 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -128,6 +128,7 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionListener; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; @@ -143,6 +144,7 @@ import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -308,7 +310,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { mock(StreamerReceiverRunner.class), () -> 10_000L, () -> 10_000L, - colocationEnabled() + colocationEnabled(), + new TableMetricSource(QualifiedName.fromSimple("test")) ); RaftGroupService svc = mock(RaftGroupService.class); @@ -494,7 +497,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { mock(IndexMetaStorage.class), new TestLowWatermark(), mock(FailureProcessor.class), - new SystemPropertiesNodeProperties() + new SystemPropertiesNodeProperties(), + new TableMetricSource(QualifiedName.fromSimple("dummy_table")) ); if (enabledColocation) {