This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 68af1f91397 IGNITE-27178 Add a total number of unresolved write
intents as a metric (#7428)
68af1f91397 is described below
commit 68af1f913972ba144d81c7521cc9ce72e63854a4
Author: Anton Laletin <[email protected]>
AuthorDate: Fri Feb 6 12:12:14 2026 +0400
IGNITE-27178 Add a total number of unresolved write intents as a metric
(#7428)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 7 ++
.../internal/metrics/AbstractMetricSource.java | 2 +-
...artitionTableStatsMetricConfigurationTest.java} | 4 +-
...t.java => ItPartitionTableStatsMetricTest.java} | 18 ++--
.../PartitionModificationCounterMetricSource.java | 81 ----------------
.../PartitionTableStatsMetricSource.java | 105 +++++++++++++++++++++
.../table/distributed/StorageUpdateHandler.java | 9 ++
.../internal/table/distributed/TableManager.java | 65 +++++++------
.../table/distributed/replicator/PendingRows.java | 11 +++
.../distributed/TableManagerRecoveryTest.java | 2 +
.../table/distributed/TableManagerTest.java | 3 +
.../internal/tx/ItTransactionMetricsTest.java | 67 +++++++++++++
.../org/apache/ignite/internal/tx/TxManager.java | 6 ++
.../ignite/internal/tx/impl/TxManagerImpl.java | 5 +
.../tx/metrics/TransactionMetricsSource.java | 54 ++++++++---
.../tx/metrics/TransactionMetricSourceTest.java | 3 +-
16 files changed, 307 insertions(+), 135 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 2f0b4851375..139c939ae3b 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -55,6 +56,12 @@ public class FakeTxManager implements TxManager {
this.clock = clock;
}
+ @Override
+ public @Nullable TransactionMetricsSource transactionMetricsSource() {
+ // No-op
+ return null;
+ }
+
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
// No-op.
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
index f3e06b9e412..2e3115cb782 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
@@ -113,7 +113,7 @@ public abstract class AbstractMetricSource<T extends
AbstractMetricSource.Holder
}
/**
- * Returns metric instances' holder. Use this on order to avoid metric
lookup from map-like data structures.
+ * Returns metric instances' holder. Use this in order to avoid metric
lookup from map-like data structures.
* Returned value is {@code null} if metrics are disabled.
*
* @return Metrics holder instance if metrics are enabled, otherwise -
{@code null}.
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
similarity index 93%
rename from
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
rename to
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
index 4a683582632..cdcd2ca1a92 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.table;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static
org.apache.ignite.internal.table.ItPartitionModificationCounterMetricsTest.expectNextMilestone;
+import static
org.apache.ignite.internal.table.ItPartitionTableStatsMetricTest.expectNextMilestone;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** Check that partition modification counter settings are depends on
configuration settings. */
-public class ItPartitionModificationCounterConfigurationTest extends
BaseSqlIntegrationTest {
+public class ItPartitionTableStatsMetricConfigurationTest extends
BaseSqlIntegrationTest {
private static final String ZONE_1_PART_NO_REPLICAS =
"zone_single_partition_no_replicas";
private static final int MIN_STALE_ROWS = 2;
private static final double STALE_ROWS_FRACTION = 1.0d;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
similarity index 95%
rename from
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
rename to
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
index b0094debcde..8b547a05717 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.table;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_COUNTER;
-import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
-import static
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE;
+import static
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_COUNTER;
+import static
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
+import static
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_NEXT_MILESTONE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
-import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource;
+import
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.Transaction;
@@ -47,9 +47,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
- * Tests for {@link PartitionModificationCounter partition modification
counter} metrics.
+ * Integration tests for table partition statistics metrics exposed via {@link
PartitionTableStatsMetricSource}.
+ *
+ * <p>Includes {@link PartitionModificationCounter partition modification
counter} metrics.
*/
-public class ItPartitionModificationCounterMetricsTest extends
BaseSqlIntegrationTest {
+public class ItPartitionTableStatsMetricTest extends BaseSqlIntegrationTest {
private static final String ZONE_1_PART_NO_REPLICAS =
"zone_single_partition_no_replicas";
private static final String ZONE_1_PART_REPLICAS = "zone_single_partition";
private static final String ZONE_8_PART_NO_REPLICAS =
"zone_multi_partition";
@@ -297,7 +299,7 @@ public class ItPartitionModificationCounterMetricsTest
extends BaseSqlIntegratio
int tableId = tableIdByName(QualifiedName.parse(tableName));
String metricSourceName =
-
PartitionModificationCounterMetricSource.formatSourceName(tableId, part);
+
PartitionTableStatsMetricSource.formatSourceName(tableId, part);
boolean metricFound = false;
@@ -346,7 +348,7 @@ public class ItPartitionModificationCounterMetricsTest
extends BaseSqlIntegratio
int tableId = tableIdByName(QualifiedName.parse(tableName));
String metricSourceName =
-
PartitionModificationCounterMetricSource.formatSourceName(tableId, partId);
+ PartitionTableStatsMetricSource.formatSourceName(tableId,
partId);
MetricManager metricManager =
unwrapIgniteImpl(node(nodeIdx)).metricManager();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
deleted file mode 100644
index 0d77f00d2f8..00000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
-import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.metrics.MetricSet;
-import org.apache.ignite.internal.metrics.MetricSource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Metrics related to {@link PartitionModificationCounter partition
modification counter}.
- */
-public class PartitionModificationCounterMetricSource implements MetricSource {
- public static final String METRIC_COUNTER = "modificationCount";
- public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
- public static final String METRIC_LAST_MILESTONE_TIMESTAMP =
"lastMilestoneTimestamp";
-
- private final Map<String, Metric> metrics = new HashMap<>();
- private final String metricSourceName;
-
- private boolean enabled;
-
- public PartitionModificationCounterMetricSource(int tableId, int
partitionId) {
- this.metricSourceName = formatSourceName(tableId, partitionId);
- }
-
- @Override
- public String name() {
- return metricSourceName;
- }
-
- @Override
- public @Nullable MetricSet enable() {
- if (enabled) {
- return null;
- }
-
- enabled = true;
-
- return new MetricSet(metricSourceName, Map.copyOf(metrics));
- }
-
- @Override
- public void disable() {
- enabled = false;
- }
-
- @Override
- public boolean enabled() {
- return enabled;
- }
-
- /** Adds a metric to the source. */
- public void addMetric(Metric metric) {
- assert !enabled : "Metrics can be added only before enabling the
metric source";
-
- metrics.put(metric.name(), metric);
- }
-
- public static String formatSourceName(int tableId, int partitionId) {
- return
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}",
tableId, partitionId);
- }
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
new file mode 100644
index 00000000000..03a03e2c560
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.List;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+
+/**
+ * Metrics related to table partition statistics.
+ *
+ * <p>This source includes {@link PartitionModificationCounter partition
modification counter} metrics.
+ */
+public class PartitionTableStatsMetricSource extends
AbstractMetricSource<PartitionTableStatsMetricSource.Holder> {
+ public static final String METRIC_COUNTER = "modificationCount";
+ public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
+ public static final String METRIC_LAST_MILESTONE_TIMESTAMP =
"lastMilestoneTimestamp";
+
+ private final PartitionModificationCounter counter;
+
+ /**
+ * Creates a new metric source for a specific table partition.
+ *
+ * @param tableId Table id.
+ * @param partitionId Partition id.
+ * @param counter Partition modification counter.
+ */
+ public PartitionTableStatsMetricSource(int tableId, int partitionId,
PartitionModificationCounter counter) {
+ super(formatSourceName(tableId, partitionId), "Metrics related to
table partition statistics.");
+
+ this.counter = counter;
+ }
+
+ /**
+ * Returns the metric source name for a given table partition.
+ *
+ * @param tableId Table id.
+ * @param partitionId Partition id.
+ * @return Metric source name.
+ */
+ public static String formatSourceName(int tableId, int partitionId) {
+ return
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}",
tableId, partitionId);
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder(counter);
+ }
+
+ /** Holder. */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final LongGauge counterValue;
+ private final LongGauge nextMilestone;
+ private final LongGauge lastMilestoneTimestamp;
+
+ private final List<Metric> metrics;
+
+ private Holder(PartitionModificationCounter counter) {
+ counterValue = new LongGauge(
+ METRIC_COUNTER,
+ "The value of the volatile counter of partition
modifications. "
+ + "This value is used to determine staleness of
the related SQL statistics.",
+ counter::value
+ );
+
+ nextMilestone = new LongGauge(
+ METRIC_NEXT_MILESTONE,
+ "The value of the next milestone for the number of
partition modifications. "
+ + "This value is used to determine staleness of
the related SQL statistics.",
+ counter::nextMilestone
+ );
+
+ lastMilestoneTimestamp = new LongGauge(
+ METRIC_LAST_MILESTONE_TIMESTAMP,
+ "The timestamp value representing the commit time of the
last modification operation that "
+ + "reached the milestone. This value is used to
determine staleness of the related SQL statistics.",
+ () -> counter.lastMilestoneTimestamp().longValue()
+ );
+
+ metrics = List.of(counterValue, nextMilestone,
lastMilestoneTimestamp);
+ }
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 313a151da87..a3a652a1cac 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -534,6 +534,15 @@ public class StorageUpdateHandler {
return indexUpdateHandler;
}
+ /**
+ * Returns the total number of unresolved write intents across all
transactions.
+ *
+ * @return Total number of pending row IDs.
+ */
+ public long getPendingRowCount() {
+ return pendingRows.getPendingRowCount();
+ }
+
/**
* Performs add of the committed row version. If a write intent is
detected on the first attempt and {@code lastCommitTs} is not
* {@code null}, it will be cleared before the second attempt. Otherwise,
{@link StorageException} will be thrown.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a29febd91d8..e19a1b5abaa 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -72,6 +72,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -111,7 +112,6 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
-import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
@@ -178,6 +178,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -363,8 +364,10 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final EventListener<ChangeLowWatermarkEventParameters>
onLowWatermarkChangedListener = this::onLwmChanged;
private final MetricManager metricManager;
+
private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
- private final Map<TablePartitionId,
PartitionModificationCounterMetricSource> partModCounterMetricSources = new
ConcurrentHashMap<>();
+ private final Map<TablePartitionId, PartitionTableStatsMetricSource>
partModCounterMetricSources = new ConcurrentHashMap<>();
+ private final Map<TablePartitionId, LongSupplier>
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
/**
* Creates a new table manager.
@@ -520,6 +523,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return inBusyLockAsync(busyLock, () -> {
+ TransactionMetricsSource transactionMetricsSource =
txManager.transactionMetricsSource();
+
transactionMetricsSource.setPendingWriteIntentsSupplier(this::totalPendingWriteIntents);
+
mvGc.start();
fullStateTransferIndexChooser.start();
@@ -1108,6 +1114,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
if (!stopGuard.compareAndSet(false, true)) {
return nullCompletedFuture();
}
+ TransactionMetricsSource transactionMetricsSource =
txManager.transactionMetricsSource();
+ if (transactionMetricsSource != null) {
+ transactionMetricsSource.setPendingWriteIntentsSupplier(null);
+ }
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED,
onZoneReplicaDestroyedListener);
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
@@ -1603,7 +1613,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
// In case of colocation there shouldn't be any table replica and thus
it shouldn't be stopped.
minTimeCollectorService.removePartition(tablePartitionId);
- PartitionModificationCounterMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
+ PartitionTableStatsMetricSource metricSource =
partModCounterMetricSources.remove(tablePartitionId);
+ pendingWriteIntentsSuppliers.remove(tablePartitionId);
if (metricSource != null) {
try {
metricManager.unregisterSource(metricSource);
@@ -1663,8 +1674,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
PartitionModificationCounter modificationCounter =
partitionModificationCounterFactory.create(partSizeSupplier,
table::stalenessConfiguration, table.tableId(), partitionId);
- registerPartitionModificationCounterMetrics(table, partitionId,
modificationCounter);
-
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
partitionId,
partitionDataStorage,
@@ -1673,50 +1682,46 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
modificationCounter,
txManager
);
+
storageUpdateHandler.start(onNodeRecovery);
+ registerPartitionTableStatsMetrics(table, partitionId,
modificationCounter);
+
+ pendingWriteIntentsSuppliers.put(new TablePartitionId(table.tableId(),
partitionId), storageUpdateHandler::getPendingRowCount);
+
return new PartitionUpdateHandlers(storageUpdateHandler,
indexUpdateHandler, gcUpdateHandler);
}
- private void registerPartitionModificationCounterMetrics(
+ private void registerPartitionTableStatsMetrics(
TableViewInternal table,
int partitionId,
PartitionModificationCounter counter
) {
- PartitionModificationCounterMetricSource metricSource =
- new PartitionModificationCounterMetricSource(table.tableId(),
partitionId);
-
- metricSource.addMetric(new LongGauge(
- PartitionModificationCounterMetricSource.METRIC_COUNTER,
- "The value of the volatile counter of partition modifications.
"
- + "This value is used to determine staleness of the
related SQL statistics.",
- counter::value
- ));
-
- metricSource.addMetric(new LongGauge(
- PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE,
- "The value of the next milestone for the number of partition
modifications. "
- + "This value is used to determine staleness of the
related SQL statistics.",
- counter::nextMilestone
- ));
-
- metricSource.addMetric(new LongGauge(
-
PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP,
- "The timestamp value representing the commit time of the last
modification operation that "
- + "reached the milestone. This value is used to
determine staleness of the related SQL statistics.",
- () -> counter.lastMilestoneTimestamp().longValue()
- ));
+ PartitionTableStatsMetricSource metricSource =
+ new PartitionTableStatsMetricSource(table.tableId(),
partitionId, counter);
try {
metricManager.registerSource(metricSource);
metricManager.enable(metricSource);
- partModCounterMetricSources.put(new
TablePartitionId(table.tableId(), partitionId), metricSource);
+ TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), partitionId);
+
+ partModCounterMetricSources.put(tablePartitionId, metricSource);
} catch (Exception e) {
LOG.warn("Failed to register metrics source for table [name={},
partitionId={}].", e, table.name(), partitionId);
}
}
+ private long totalPendingWriteIntents() {
+ long sum = 0;
+
+ for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+ sum += supplier.getAsLong();
+ }
+
+ return sum;
+ }
+
/**
* Returns a cached table instance if it exists, {@code null} otherwise.
Can return a table that is being stopped.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
index 110d45144de..889f3eaa24f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
@@ -88,4 +88,15 @@ public class PendingRows {
return pendingRows == null ? EMPTY_SET : pendingRows;
}
+ /**
+ * Returns the total number of unresolved write intents across all
transactions.
+ *
+ * @return Total number of pending row IDs.
+ */
+ public long getPendingRowCount() {
+ return txsPendingRowIds.values().stream()
+ .mapToLong(Set::size)
+ .sum();
+ }
+
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 25c1309e1c3..b6fbc264fcb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -159,6 +159,7 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
@@ -427,6 +428,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(clusterService.topologyService()).thenReturn(topologyService);
when(topologyService.localMember()).thenReturn(node);
when(distributionZoneManager.dataNodes(any(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
+
when(txManager.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
PlacementDriver placementDriver = new TestPlacementDriver(node);
ClockService clockService = new TestClockService(clock);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index b4d367fbbcd..a3068cd7042 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.IgniteSql;
@@ -265,6 +266,8 @@ public class TableManagerTest extends IgniteAbstractTest {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+
when(tm.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
+
tblManagerFut = new CompletableFuture<>();
mockMetastore();
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
index b3f4499b17a..17891a7ba16 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -46,6 +47,7 @@ import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -407,4 +409,69 @@ public class ItTransactionMetricsTest extends
ClusterPerClassIntegrationTest {
assertThat(actualMetrics0.get("RoCommits"),
is(metrics0.get("RoCommits") + 1));
assertThat(actualMetrics0.get("RwCommits"),
is(metrics0.get("RwCommits") + 1));
}
+
+ @Test
+ void globalPendingWriteIntentsMetric() throws Exception {
+ String zoneName = "zone_single_partition_no_replicas_tx_metrics";
+
+ String table1 = "test_table_pending_wi_1";
+ String table2 = "test_table_pending_wi_2";
+
+ sql("CREATE ZONE " + zoneName + " (PARTITIONS 1, REPLICAS 1) storage
profiles ['default']");
+
+ sql("CREATE TABLE " + table1 + "(id INT PRIMARY KEY, val INT) ZONE " +
zoneName);
+ sql("CREATE TABLE " + table2 + "(id INT PRIMARY KEY, val INT) ZONE " +
zoneName);
+
+ Transaction tx = node(0).transactions().begin();
+
+ int table1Inserts = 3;
+ int table2Inserts = 5;
+
+ try {
+ for (int i = 0; i < table1Inserts; i++) {
+ sql(tx, "INSERT INTO " + table1 + " VALUES(?, ?)", i, i);
+ }
+
+ for (int i = 0; i < table2Inserts; i++) {
+ sql(tx, "INSERT INTO " + table2 + " VALUES(?, ?)", i, i);
+ }
+
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
assertThat(totalPendingWriteIntents(), is((long) table1Inserts +
table2Inserts)));
+ } finally {
+ tx.commit();
+ }
+
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> assertThat(totalPendingWriteIntents(),
is(0L)));
+ }
+
+ private static long totalPendingWriteIntents() {
+ long sum = 0;
+
+ for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+ sum += pendingWriteIntentsOnNode(i);
+ }
+
+ return sum;
+ }
+
+ private static long pendingWriteIntentsOnNode(int nodeIdx) {
+ MetricSet metrics = unwrapIgniteImpl(node(nodeIdx))
+ .metricManager()
+ .metricSnapshot()
+ .metrics()
+ .get(TransactionMetricsSource.SOURCE_NAME);
+
+ assertThat("Transaction metrics must be present on node " + nodeIdx,
metrics != null, is(true));
+
+ LongMetric metric =
metrics.get(TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS);
+
+ assertThat("Metric must be present: "
+ + TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS,
metric != null, is(true));
+
+ return metric.value();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index bab8f783c8e..49c9d151b41 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -36,6 +37,11 @@ import org.jetbrains.annotations.TestOnly;
* A transaction manager.
*/
public interface TxManager extends IgniteComponent {
+ /**
+ * Returns transaction metrics source.
+ */
+ TransactionMetricsSource transactionMetricsSource();
+
/**
* Starts an implicit read-write transaction coordinated by a local node.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 913cd44ec04..d169255c381 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -411,6 +411,11 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txMetrics = new TransactionMetricsSource(clockService);
}
+ @Override
+ public TransactionMetricsSource transactionMetricsSource() {
+ return txMetrics;
+ }
+
private CompletableFuture<Boolean> primaryReplicaEventListener(
PrimaryReplicaEventParameters eventParameters,
Consumer<ZonePartitionId> action
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
index 84d5b6d9de5..34723e35b78 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
@@ -21,18 +21,23 @@ import static
org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import java.util.List;
import java.util.UUID;
+import java.util.function.LongSupplier;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.DistributionMetric;
import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.Holder;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Transaction metric source, that contains a set of transaction metrics.
**/
-public class TransactionMetricsSource extends AbstractMetricSource<Holder> {
+public class TransactionMetricsSource extends
AbstractMetricSource<TransactionMetricsSource.Holder> {
+ /** Metric name for unresolved (uncommitted) write intents. */
+ public static final String METRIC_PENDING_WRITE_INTENTS =
"PendingWriteIntents";
+
/** Histogram buckets for duration metrics in milliseconds. */
private static final long[] HISTOGRAM_BUCKETS =
{1, 2, 4, 8, 16, 25, 50, 75, 100, 250, 500, 750, 1000, 3000, 5000,
10000, 25000, 60000};
@@ -43,6 +48,8 @@ public class TransactionMetricsSource extends
AbstractMetricSource<Holder> {
/** Clock service to calculate a timestamp for rolled back transactions. */
private final ClockService clockService;
+ private volatile LongSupplier pendingWriteIntentsSupplier = () -> 0L;
+
/**
* Creates a new instance of {@link TransactionMetricsSource}.
*/
@@ -52,6 +59,15 @@ public class TransactionMetricsSource extends
AbstractMetricSource<Holder> {
this.clockService = clockService;
}
+ /**
+ * Sets a supplier of the total number of unresolved (uncommitted) write
intents local to this node.
+ *
+ * @param supplier Supplier, or {@code null} to reset to default (always
returns {@code 0}).
+ */
+ public void setPendingWriteIntentsSupplier(@Nullable LongSupplier
supplier) {
+ pendingWriteIntentsSupplier = supplier == null ? () -> 0L : supplier;
+ }
+
/**
* Updates read-write related metrics.
*
@@ -145,7 +161,7 @@ public class TransactionMetricsSource extends
AbstractMetricSource<Holder> {
@Override
protected Holder createHolder() {
- return new Holder();
+ return new Holder(() -> pendingWriteIntentsSupplier.getAsLong());
}
private long calculateTransactionDuration(UUID transactionId) {
@@ -193,15 +209,29 @@ public class TransactionMetricsSource extends
AbstractMetricSource<Holder> {
"Active",
"Number of running transactions.");
- private final List<Metric> metrics = List.of(
- totalCommits,
- rwCommits,
- roCommits,
- totalRollbacks,
- rwRollbacks,
- roRollbacks,
- rwDuration,
- roDuration);
+ private final LongGauge pendingWriteIntents;
+
+ private final List<Metric> metrics;
+
+ private Holder(LongSupplier pendingWriteIntentsSupplier) {
+ pendingWriteIntents = new LongGauge(
+ METRIC_PENDING_WRITE_INTENTS,
+ "Total number of unresolved (uncommitted) write intents
across all local partitions on this node.",
+ pendingWriteIntentsSupplier
+ );
+
+ metrics = List.of(
+ totalCommits,
+ rwCommits,
+ roCommits,
+ totalRollbacks,
+ rwRollbacks,
+ roRollbacks,
+ rwDuration,
+ roDuration,
+ pendingWriteIntents
+ );
+ }
@Override
public Iterable<Metric> metrics() {
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
index c480405d20c..5075e713dbc 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
@@ -61,7 +61,8 @@ public class TransactionMetricSourceTest extends
BaseIgniteAbstractTest {
"RoCommits",
"RoRollbacks",
"RwDuration",
- "RoDuration");
+ "RoDuration",
+ "PendingWriteIntents");
var actualMetrics = new HashSet<String>();
set.forEach(m -> actualMetrics.add(m.name()));