This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4dddc4d3516 IGNITE-27668 Refactor TableMetricSource to use common
interface for writable entities (#7479)
4dddc4d3516 is described below
commit 4dddc4d351607c90e36c2d4ebda1c611cb638e2b
Author: Phillippko <[email protected]>
AuthorDate: Tue Jan 27 17:06:24 2026 +0700
IGNITE-27668 Refactor TableMetricSource to use common interface for
writable entities (#7479)
---
.../ignite/client/fakes/FakeInternalTable.java | 4 +-
.../internal/table/metrics/ItTableMetricsTest.java | 38 ++++++---
.../ignite/internal/table/InternalTable.java | 4 +-
.../apache/ignite/internal/table/TableImpl.java | 4 +-
.../ignite/internal/table/TableViewInternal.java | 4 +-
.../internal/table/distributed/TableManager.java | 14 ++--
.../replicator/PartitionReplicaListener.java | 92 ++++++++++++----------
.../distributed/storage/InternalTableImpl.java | 8 +-
.../table/metrics/ReadWriteMetricSource.java | 59 ++++++++++++++
.../internal/table/metrics/TableMetricSource.java | 48 +++++------
10 files changed, 176 insertions(+), 99 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 2512b566314..ed7c997a7df 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -58,7 +58,7 @@ import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
@@ -524,7 +524,7 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
}
@Override
- public TableMetricSource metrics() {
+ public ReadWriteMetricSource metrics() {
return null;
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
index 7d18249ec38..31787fe49a5 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/metrics/ItTableMetricsTest.java
@@ -171,14 +171,14 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
@Test
void put() {
- testKeyValueViewOperation(WRITES, 1, view -> view.put(null, 42,
"value_42"));
+ testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
1L), view -> view.put(null, 42, "value_42"));
}
@Test
void putAll() {
Map<Integer, String> values = Map.of(12, "12", 15, "15", 17, "17", 19,
"19", 23, "23");
- testKeyValueViewOperation(WRITES, values.size(), view ->
view.putAll(null, values));
+ testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
(long) values.size()), view -> view.putAll(null, values));
}
@Test
@@ -193,10 +193,10 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
kvView.put(null, key, "value_42");
// Remove existing key.
- testKeyValueViewOperation(WRITES, 1, view -> view.remove(null, key));
+ testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
1L), view -> view.remove(null, key));
// Remove non existing key.
- testKeyValueViewOperation(WRITES, 0, view -> view.remove(null, key));
+ testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
0L), view -> view.remove(null, key));
}
@Test
@@ -223,7 +223,13 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
kvView.removeAll(null);
kvView.putAll(null, values);
- testKeyValueViewOperation(WRITES, values.size(), view ->
view.removeAll(null));
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27670 Fix
removeAll effect on read metrics.
+ // Reads happen when batch is retrieved, even though removeAll
shouldn't update read metrics.
+ testKeyValueViewOperation(
+ of(RO_READS, RW_READS, WRITES),
+ of(0L, (long) values.size(), (long) values.size()),
+ view -> view.removeAll(null)
+ );
}
@Test
@@ -234,16 +240,24 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
kvView.putAll(null, values);
// Remove existing keys.
- testKeyValueViewOperation(WRITES, values.size(), view ->
view.removeAll(null, values.keySet()));
+ testKeyValueViewOperation(
+ of(RO_READS, RW_READS, WRITES),
+ of(0L, 0L, (long) values.size()),
+ view -> view.removeAll(null, values.keySet())
+ );
// Remove non-existing keys.
- testKeyValueViewOperation(WRITES, 0, view -> view.removeAll(null,
values.keySet()));
+ testKeyValueViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
0L), view -> view.removeAll(null, values.keySet()));
kvView.putAll(null, values);
// Remove non-unique keys.
List<Integer> nonUniqueKeys = of(12, 15, 12, 17, 19, 23);
- testKeyValueViewOperation(WRITES, nonUniqueKeys.size() - 1, view ->
view.removeAll(null, nonUniqueKeys));
+ testKeyValueViewOperation(
+ of(RO_READS, RW_READS, WRITES),
+ of(0L, 0L, nonUniqueKeys.size() - 1L),
+ view -> view.removeAll(null, nonUniqueKeys)
+ );
}
@Test
@@ -367,15 +381,15 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
recordView(0).upsertAll(null, recs);
// Delete existing keys.
- testRecordViewOperation(WRITES, recs.size(), view ->
view.deleteAll(null, keys));
+ testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
((long) recs.size())), view -> view.deleteAll(null, keys));
// Delete non-existing keys.
- testRecordViewOperation(WRITES, 0L, view -> view.deleteAll(null,
keys));
+ testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
0L), view -> view.deleteAll(null, keys));
recordView(0).insert(null, recs.get(0));
// Delete one non-existing key.
- testRecordViewOperation(WRITES, 1L, view -> view.deleteAll(null,
keys));
+ testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
1L), view -> view.deleteAll(null, keys));
// Non-unique keys.
List<Tuple> nonUniqueKeys = of(
@@ -389,7 +403,7 @@ public class ItTableMetricsTest extends
ClusterPerClassIntegrationTest {
recordView(0).upsertAll(null, nonUniqueRecs);
- testRecordViewOperation(WRITES, 2L, view -> view.deleteAll(null,
nonUniqueKeys));
+ testRecordViewOperation(of(RO_READS, RW_READS, WRITES), of(0L, 0L,
2L), view -> view.deleteAll(null, nonUniqueKeys));
}
@Test
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index bfd2e21bff1..64ccd5e9f9b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.TransactionException;
@@ -454,5 +454,5 @@ public interface InternalTable extends ManuallyCloseable {
*
* @return Table metrics source.
*/
- TableMetricSource metrics();
+ ReadWriteMetricSource metrics();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 26a163aa705..d5cb5d3bd9c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -47,7 +47,7 @@ import
org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import
org.apache.ignite.internal.table.distributed.TableStatsStalenessConfiguration;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.sql.IgniteSql;
@@ -321,7 +321,7 @@ public class TableImpl implements TableViewInternal {
}
@Override
- public TableMetricSource metrics() {
+ public ReadWriteMetricSource metrics() {
return tbl.metrics();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
index 7ef9edccb2a..6729fc53354 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableViewInternal.java
@@ -27,7 +27,7 @@ import
org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableStatsStalenessConfiguration;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -136,7 +136,7 @@ public interface TableViewInternal extends Table {
*
* @return Table metrics source.
*/
- TableMetricSource metrics();
+ ReadWriteMetricSource metrics();
/**
* Updates staleness configuration with provided parameters.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 39b917c0505..0de3cbddb2d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -168,6 +168,7 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
@@ -1192,7 +1193,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Objects.requireNonNull(streamerReceiverRunner),
() -> txCfg.value().readWriteTimeoutMillis(),
() -> txCfg.value().readOnlyTimeoutMillis(),
-
createAndRegisterMetricsSource(tableStorage.getTableDescriptor(), tableName)
+ createAndRegisterMetricsSource(tableStorage, tableName)
);
CatalogTableProperties descProps = tableDescriptor.properties();
@@ -2009,11 +2010,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- private TableMetricSource
createAndRegisterMetricsSource(StorageTableDescriptor tableDescriptor,
QualifiedName tableName) {
+ private ReadWriteMetricSource
createAndRegisterMetricsSource(MvTableStorage tableStorage, QualifiedName
tableName) {
+ StorageTableDescriptor tableDescriptor =
tableStorage.getTableDescriptor();
+
+ CatalogTableDescriptor catalogTableDescriptor =
catalogService.latestCatalog().table(tableDescriptor.getId());
+
// The table might be created during the recovery phase.
// In that case, we should only register the metric source for the
actual tables that exist in the latest catalog.
- boolean registrationNeeded =
- catalogService.latestCatalog().table(tableDescriptor.getId())
!= null;
+ boolean registrationNeeded = catalogTableDescriptor != null;
StorageEngine engine =
dataStorageMgr.engineByStorageProfile(tableDescriptor.getStorageProfile());
@@ -2032,7 +2036,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- TableMetricSource source = new TableMetricSource(tableName);
+ ReadWriteMetricSource source = new TableMetricSource(tableName);
if (registrationNeeded) {
try {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9bd4da24b5f..3d7e5a0ab50 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -177,7 +177,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockException;
@@ -320,7 +320,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
private static final boolean SKIP_UPDATES =
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
- private final TableMetricSource metrics;
+ private final ReadWriteMetricSource metrics;
private final TableAwareReplicaRequestPreProcessor
tableAwareReplicaRequestPreProcessor;
private final ReliableCatalogVersions reliableCatalogVersions;
@@ -380,7 +380,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
IndexMetaStorage indexMetaStorage,
LowWatermark lowWatermark,
FailureProcessor failureProcessor,
- TableMetricSource metrics
+ ReadWriteMetricSource metrics
) {
this.mvDataStorage = mvDataStorage;
this.txManager = txManager;
@@ -582,7 +582,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else {
return
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
.thenApply(ignored -> {
- metrics.onRead(rows.size(), false);
+ metrics.onRead(rows.size(), false,
true);
return rows;
});
@@ -651,7 +651,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return safeReadFuture
.thenCompose(unused -> lookupIndex(request,
indexStorage))
.thenApply(rows -> {
- metrics.onRead(rows.size(), true);
+ metrics.onRead(rows.size(), true, true);
return rows;
});
@@ -662,7 +662,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return safeReadFuture
.thenCompose(unused -> scanSortedIndex(request,
indexStorage))
.thenApply(rows -> {
- metrics.onRead(rows.size(), true);
+ metrics.onRead(rows.size(), true, true);
return rows;
});
@@ -672,7 +672,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
.thenCompose(
unused -> retrieveExactEntriesUntilCursorEmpty(txId,
request.coordinatorId(), readTimestamp, cursorId, batchCount))
.thenApply(rows -> {
- metrics.onRead(rows.size(), true);
+ metrics.onRead(rows.size(), true, true);
return rows;
});
@@ -1683,13 +1683,13 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// Nothing found in the storage, return null.
if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
- metrics.onRead(true);
+ metrics.onRead(true, false);
return nullCompletedFuture();
}
if (writeIntents.isEmpty()) {
- metrics.onRead(true);
+ metrics.onRead(true, true);
// No write intents, then return the committed value. We
already know that regularEntries is not empty.
return completedFuture(regularEntries.get(0).binaryRow());
@@ -1704,7 +1704,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
resolveWriteIntentReadability(writeIntent, ts)
.thenApply(writeIntentReadable ->
inBusyLock(busyLock, () -> {
- metrics.onRead(true);
+ metrics.onRead(true, true);
if (writeIntentReadable) {
return findAny(writeIntents,
wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
@@ -1861,7 +1861,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
if (rowIdsToDelete.isEmpty()) {
- metrics.onRead(searchRows.size(), false);
+ metrics.onRead(searchRows.size(), false, false);
return completedFuture(new ReplicaResult(result,
null));
}
@@ -1877,8 +1877,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onRead(searchRows.size(), false);
- metrics.onWrite(rowIdsToDelete.size());
+ metrics.onRead(searchRows.size(), false, true);
+ metrics.onRemoval(rowIdsToDelete.size());
return new ReplicaResult(result, res);
});
@@ -1917,8 +1917,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
if (rowsToInsert.isEmpty()) {
- metrics.onRead(searchRows.size(), false);
-
+ metrics.onRead(searchRows.size(), false, false);
return completedFuture(new ReplicaResult(result,
null));
}
@@ -1951,7 +1950,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onRead(searchRows.size(), false);
+ metrics.onRead(searchRows.size(), false, true);
metrics.onWrite(rowsToInsert.size());
// Release short term locks.
@@ -2056,8 +2055,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
if (rowsToUpdate.isEmpty()) {
- metrics.onRead(uniqueKeysCountFinal, false);
-
return completedFuture(new ReplicaResult(null, null));
}
@@ -2072,7 +2069,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onRead(uniqueKeysCountFinal, false);
metrics.onWrite(uniqueKeysCountFinal);
// Release short term locks.
@@ -2134,15 +2130,24 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
result.add(rowFut.join());
}
- if (allElementsAreNull(result)) {
- metrics.onRead(result.size(), false);
+ int hits = 0;
+ for (BinaryRow row : result) {
+ if (row != null) {
+ hits++;
+ }
+ }
+
+ if (hits == 0) {
+ metrics.onRead(result.size(), false, false);
return completedFuture(new
ReplicaResult(result, null));
}
+ int finalHits = hits;
return
validateRwReadAgainstSchemaAfterTakingLocks(txId)
.thenApply(unused -> {
- metrics.onRead(result.size(), false);
+ metrics.onRead(result.size() -
finalHits, false, false);
+ metrics.onRead(finalHits, false, true);
return new ReplicaResult(result, null);
});
@@ -2208,7 +2213,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onWrite(rowIdsToDelete.size());
+ metrics.onRemoval(rowIdsToDelete.size());
return new ReplicaResult(result, res);
});
@@ -2747,7 +2752,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_DELETE_EXACT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new ReplicaResult(false, null));
}
@@ -2755,7 +2760,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new
ReplicaResult(false, null));
}
@@ -2773,8 +2778,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onRead(false);
- metrics.onWrite();
+ metrics.onRead(false, true);
+ metrics.onRemoval();
return new ReplicaResult(true,
res);
});
@@ -2784,7 +2789,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_INSERT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId != null) {
- metrics.onRead(false);
+ metrics.onRead(false, true);
return completedFuture(new ReplicaResult(false, null));
}
@@ -2805,7 +2810,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
- metrics.onRead(false);
+ metrics.onRead(false, true);
metrics.onWrite();
// Release short term locks.
@@ -2874,7 +2879,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
- metrics.onRead(false);
+ metrics.onRead(false, true);
metrics.onWrite();
// Release short term locks.
@@ -2887,7 +2892,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_GET_AND_REPLACE: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new ReplicaResult(null, null));
}
@@ -2907,7 +2912,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
- metrics.onRead(false);
+ metrics.onRead(false, rowId != null);
+
metrics.onWrite();
// Release short term locks.
@@ -2920,7 +2926,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_REPLACE_IF_EXIST: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new ReplicaResult(false, null));
}
@@ -2940,7 +2946,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock)))
.thenApply(tuple -> {
- metrics.onRead(false);
+ metrics.onRead(false, false);
metrics.onWrite();
// Release short term locks.
@@ -2976,7 +2982,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_GET: {
return resolveRowByPk(primaryKey, txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return nullCompletedFuture();
}
@@ -2984,7 +2990,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return takeLocksForGet(rowId, txId)
.thenCompose(ignored ->
validateRwReadAgainstSchemaAfterTakingLocks(txId))
.thenApply(ignored -> {
- metrics.onRead(false);
+ metrics.onRead(false, true);
return new ReplicaResult(row, null);
});
@@ -3014,7 +3020,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onWrite();
+ metrics.onRemoval();
return new ReplicaResult(true, res);
});
@@ -3023,7 +3029,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
case RW_GET_AND_DELETE: {
return resolveRowByPk(primaryKey, txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return nullCompletedFuture();
}
@@ -3046,8 +3052,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
)
.thenApply(res -> {
- metrics.onRead(false);
- metrics.onWrite();
+ metrics.onRead(false, true);
+ metrics.onRemoval();
return new ReplicaResult(row, res);
});
@@ -3265,7 +3271,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (request.requestType() == RW_REPLACE) {
return resolveRowByPk(extractPk(newRow), txId, (rowId, row,
lastCommitTime) -> {
if (rowId == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new ReplicaResult(false, null));
}
@@ -3273,7 +3279,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return takeLocksForReplace(expectedRow, row, newRow, rowId,
txId)
.thenCompose(rowIdLock -> {
if (rowIdLock == null) {
- metrics.onRead(false);
+ metrics.onRead(false, false);
return completedFuture(new
ReplicaResult(false, null));
}
@@ -3296,7 +3302,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
)
.thenApply(res -> new IgniteBiTuple<>(res,
rowIdLock))
.thenApply(tuple -> {
- metrics.onRead(false);
+ metrics.onRead(false, true);
metrics.onWrite();
// Release short term locks.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 320c2193d73..fc5cfc416bd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -122,7 +122,7 @@ import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TxContext;
import
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
-import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.table.metrics.ReadWriteMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
@@ -198,7 +198,7 @@ public class InternalTableImpl implements InternalTable {
/** Default read-only transaction timeout. */
private final Supplier<Long> defaultReadTxTimeout;
- private final TableMetricSource metrics;
+ private final ReadWriteMetricSource metrics;
/**
* Constructor.
@@ -236,7 +236,7 @@ public class InternalTableImpl implements InternalTable {
StreamerReceiverRunner streamerReceiverRunner,
Supplier<Long> defaultRwTxTimeout,
Supplier<Long> defaultReadTxTimeout,
- TableMetricSource metrics
+ ReadWriteMetricSource metrics
) {
this.tableName = tableName;
this.zoneId = zoneId;
@@ -2323,7 +2323,7 @@ public class InternalTableImpl implements InternalTable {
}
@Override
- public TableMetricSource metrics() {
+ public ReadWriteMetricSource metrics() {
return metrics;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
new file mode 100644
index 00000000000..687baa52ca5
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/ReadWriteMetricSource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.metrics;
+
+import org.apache.ignite.internal.metrics.MetricSource;
+
+/** Common interface for reads and writes to tables and caches. */
+public interface ReadWriteMetricSource extends MetricSource {
+ /**
+ * Called after get request.
+ *
+ * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
+ * @param hit {@code true} if row was found, {@code false} otherwise.
+ */
+ void onRead(boolean readOnly, boolean hit);
+
+ /**
+ * Called after get request for multiple rows.
+ *
+ * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
+ * @param hit {code true} if row was found, {@code false} otherwise.
+ */
+ void onRead(int x, boolean readOnly, boolean hit);
+
+ /**
+ * Increments a counter of writes.
+ */
+ void onWrite();
+
+ /**
+ * Adds the given {@code x} to a counter of writes.
+ */
+ void onWrite(int x);
+
+ /**
+ * Should be called instead of {@link #onWrite} if row was removed.
+ */
+ void onRemoval();
+
+ /**
+ * Should be called instead of {@link #onWrite} if row was removed.
+ */
+ void onRemoval(int x);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
index a8800ee4c52..4c5bde703f4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/metrics/TableMetricSource.java
@@ -107,7 +107,7 @@ import org.apache.ignite.table.QualifiedName;
*
* <i>Note: Only synchronous methods are listed. Asynchronous methods affect
the same metrics.</i>
*/
-public class TableMetricSource extends AbstractMetricSource<Holder> {
+public class TableMetricSource extends AbstractMetricSource<Holder> implements
ReadWriteMetricSource {
/** Source name. */
public static final String SOURCE_NAME = "tables";
@@ -147,29 +147,17 @@ public class TableMetricSource extends
AbstractMetricSource<Holder> {
return tableName;
}
- /**
- * Increments a counter of reads.
- *
- * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
- */
- public void onRead(boolean readOnly) {
- Holder holder = holder();
+ @Override
+ public void onRead(boolean readOnly, boolean hit) {
+ onRead(1, readOnly);
+ }
- if (holder != null) {
- if (readOnly) {
- holder.roReads.increment();
- } else {
- holder.rwReads.increment();
- }
- }
+ @Override
+ public void onRead(int x, boolean readOnly, boolean hit) {
+ onRead(x, readOnly);
}
- /**
- * Adds the given {@code x} to a counter of reads.
- *
- * @param readOnly {@code true} if read operation is executed within
read-only transaction, and {@code false} otherwise.
- */
- public void onRead(int x, boolean readOnly) {
+ private void onRead(int x, boolean readOnly) {
Holder holder = holder();
if (holder != null) {
@@ -181,9 +169,7 @@ public class TableMetricSource extends
AbstractMetricSource<Holder> {
}
}
- /**
- * Increments a counter of writes.
- */
+ @Override
public void onWrite() {
Holder holder = holder();
@@ -192,9 +178,7 @@ public class TableMetricSource extends
AbstractMetricSource<Holder> {
}
}
- /**
- * Adds the given {@code x} to a counter of writes.
- */
+ @Override
public void onWrite(int x) {
Holder holder = holder();
@@ -203,6 +187,16 @@ public class TableMetricSource extends
AbstractMetricSource<Holder> {
}
}
+ @Override
+ public void onRemoval() {
+ onWrite();
+ }
+
+ @Override
+ public void onRemoval(int x) {
+ onWrite(x);
+ }
+
@Override
protected Holder createHolder() {
return new Holder();