This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 68af1f91397 IGNITE-27178 Add a total number of unresolved write 
intents as a metric (#7428)
68af1f91397 is described below

commit 68af1f913972ba144d81c7521cc9ce72e63854a4
Author: Anton Laletin <[email protected]>
AuthorDate: Fri Feb 6 12:12:14 2026 +0400

    IGNITE-27178 Add a total number of unresolved write intents as a metric 
(#7428)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   7 ++
 .../internal/metrics/AbstractMetricSource.java     |   2 +-
 ...artitionTableStatsMetricConfigurationTest.java} |   4 +-
 ...t.java => ItPartitionTableStatsMetricTest.java} |  18 ++--
 .../PartitionModificationCounterMetricSource.java  |  81 ----------------
 .../PartitionTableStatsMetricSource.java           | 105 +++++++++++++++++++++
 .../table/distributed/StorageUpdateHandler.java    |   9 ++
 .../internal/table/distributed/TableManager.java   |  65 +++++++------
 .../table/distributed/replicator/PendingRows.java  |  11 +++
 .../distributed/TableManagerRecoveryTest.java      |   2 +
 .../table/distributed/TableManagerTest.java        |   3 +
 .../internal/tx/ItTransactionMetricsTest.java      |  67 +++++++++++++
 .../org/apache/ignite/internal/tx/TxManager.java   |   6 ++
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   5 +
 .../tx/metrics/TransactionMetricsSource.java       |  54 ++++++++---
 .../tx/metrics/TransactionMetricSourceTest.java    |   3 +-
 16 files changed, 307 insertions(+), 135 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 2f0b4851375..139c939ae3b 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
 import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
@@ -55,6 +56,12 @@ public class FakeTxManager implements TxManager {
         this.clock = clock;
     }
 
+    @Override
+    public @Nullable TransactionMetricsSource transactionMetricsSource() {
+        // No-op
+        return null;
+    }
+
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         // No-op.
diff --git 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
index f3e06b9e412..2e3115cb782 100644
--- 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
+++ 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/AbstractMetricSource.java
@@ -113,7 +113,7 @@ public abstract class AbstractMetricSource<T extends 
AbstractMetricSource.Holder
     }
 
     /**
-     * Returns metric instances' holder. Use this on order to avoid metric 
lookup from map-like data structures.
+     * Returns metric instances' holder. Use this in order to avoid metric 
lookup from map-like data structures.
      * Returned value is {@code null} if metrics are disabled.
      *
      * @return Metrics holder instance if metrics are enabled, otherwise - 
{@code null}.
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
similarity index 93%
rename from 
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
rename to 
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
index 4a683582632..cdcd2ca1a92 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterConfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricConfigurationTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.table;
 
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static 
org.apache.ignite.internal.table.ItPartitionModificationCounterMetricsTest.expectNextMilestone;
+import static 
org.apache.ignite.internal.table.ItPartitionTableStatsMetricTest.expectNextMilestone;
 
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** Check that partition modification counter settings are depends on 
configuration settings. */
-public class ItPartitionModificationCounterConfigurationTest extends 
BaseSqlIntegrationTest {
+public class ItPartitionTableStatsMetricConfigurationTest extends 
BaseSqlIntegrationTest {
     private static final String ZONE_1_PART_NO_REPLICAS = 
"zone_single_partition_no_replicas";
     private static final int MIN_STALE_ROWS = 2;
     private static final double STALE_ROWS_FRACTION = 1.0d;
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
similarity index 95%
rename from 
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
rename to 
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
index b0094debcde..8b547a05717 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionModificationCounterMetricsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionTableStatsMetricTest.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.table;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_COUNTER;
-import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
-import static 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE;
+import static 
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_COUNTER;
+import static 
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP;
+import static 
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource.METRIC_NEXT_MILESTONE;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.MetricSet;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
-import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterMetricSource;
+import 
org.apache.ignite.internal.table.distributed.PartitionTableStatsMetricSource;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.Transaction;
@@ -47,9 +47,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Tests for {@link PartitionModificationCounter partition modification 
counter} metrics.
+ * Integration tests for table partition statistics metrics exposed via {@link 
PartitionTableStatsMetricSource}.
+ *
+ * <p>Includes {@link PartitionModificationCounter partition modification 
counter} metrics.
  */
-public class ItPartitionModificationCounterMetricsTest extends 
BaseSqlIntegrationTest {
+public class ItPartitionTableStatsMetricTest extends BaseSqlIntegrationTest {
     private static final String ZONE_1_PART_NO_REPLICAS = 
"zone_single_partition_no_replicas";
     private static final String ZONE_1_PART_REPLICAS = "zone_single_partition";
     private static final String ZONE_8_PART_NO_REPLICAS = 
"zone_multi_partition";
@@ -297,7 +299,7 @@ public class ItPartitionModificationCounterMetricsTest 
extends BaseSqlIntegratio
                 int tableId = tableIdByName(QualifiedName.parse(tableName));
 
                 String metricSourceName =
-                        
PartitionModificationCounterMetricSource.formatSourceName(tableId, part);
+                        
PartitionTableStatsMetricSource.formatSourceName(tableId, part);
 
                 boolean metricFound = false;
 
@@ -346,7 +348,7 @@ public class ItPartitionModificationCounterMetricsTest 
extends BaseSqlIntegratio
         int tableId = tableIdByName(QualifiedName.parse(tableName));
 
         String metricSourceName =
-                
PartitionModificationCounterMetricSource.formatSourceName(tableId, partId);
+                PartitionTableStatsMetricSource.formatSourceName(tableId, 
partId);
 
         MetricManager metricManager = 
unwrapIgniteImpl(node(nodeIdx)).metricManager();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
deleted file mode 100644
index 0d77f00d2f8..00000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterMetricSource.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
-import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.metrics.MetricSet;
-import org.apache.ignite.internal.metrics.MetricSource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Metrics related to {@link PartitionModificationCounter partition 
modification counter}.
- */
-public class PartitionModificationCounterMetricSource implements MetricSource {
-    public static final String METRIC_COUNTER = "modificationCount";
-    public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
-    public static final String METRIC_LAST_MILESTONE_TIMESTAMP = 
"lastMilestoneTimestamp";
-
-    private final Map<String, Metric> metrics = new HashMap<>();
-    private final String metricSourceName;
-
-    private boolean enabled;
-
-    public PartitionModificationCounterMetricSource(int tableId, int 
partitionId) {
-        this.metricSourceName = formatSourceName(tableId, partitionId);
-    }
-
-    @Override
-    public String name() {
-        return metricSourceName;
-    }
-
-    @Override
-    public @Nullable MetricSet enable() {
-        if (enabled) {
-            return null;
-        }
-
-        enabled = true;
-
-        return new MetricSet(metricSourceName, Map.copyOf(metrics));
-    }
-
-    @Override
-    public void disable() {
-        enabled = false;
-    }
-
-    @Override
-    public boolean enabled() {
-        return enabled;
-    }
-
-    /** Adds a metric to the source. */
-    public void addMetric(Metric metric) {
-        assert !enabled : "Metrics can be added only before enabling the 
metric source";
-
-        metrics.put(metric.name(), metric);
-    }
-
-    public static String formatSourceName(int tableId, int partitionId) {
-        return 
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}", 
tableId, partitionId);
-    }
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
new file mode 100644
index 00000000000..03a03e2c560
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionTableStatsMetricSource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.List;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+
+/**
+ * Metrics related to table partition statistics.
+ *
+ * <p>This source includes {@link PartitionModificationCounter partition 
modification counter} metrics.
+ */
+public class PartitionTableStatsMetricSource extends 
AbstractMetricSource<PartitionTableStatsMetricSource.Holder> {
+    public static final String METRIC_COUNTER = "modificationCount";
+    public static final String METRIC_NEXT_MILESTONE = "nextMilestone";
+    public static final String METRIC_LAST_MILESTONE_TIMESTAMP = 
"lastMilestoneTimestamp";
+
+    private final PartitionModificationCounter counter;
+
+    /**
+     * Creates a new metric source for a specific table partition.
+     *
+     * @param tableId Table id.
+     * @param partitionId Partition id.
+     * @param counter Partition modification counter.
+     */
+    public PartitionTableStatsMetricSource(int tableId, int partitionId, 
PartitionModificationCounter counter) {
+        super(formatSourceName(tableId, partitionId), "Metrics related to 
table partition statistics.");
+
+        this.counter = counter;
+    }
+
+    /**
+     * Returns the metric source name for a given table partition.
+     *
+     * @param tableId Table id.
+     * @param partitionId Partition id.
+     * @return Metric source name.
+     */
+    public static String formatSourceName(int tableId, int partitionId) {
+        return 
IgniteStringFormatter.format("partition.statistics.table.{}.partition.{}", 
tableId, partitionId);
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder(counter);
+    }
+
+    /** Holder. */
+    protected static class Holder implements 
AbstractMetricSource.Holder<Holder> {
+        private final LongGauge counterValue;
+        private final LongGauge nextMilestone;
+        private final LongGauge lastMilestoneTimestamp;
+
+        private final List<Metric> metrics;
+
+        private Holder(PartitionModificationCounter counter) {
+            counterValue = new LongGauge(
+                    METRIC_COUNTER,
+                    "The value of the volatile counter of partition 
modifications. "
+                            + "This value is used to determine staleness of 
the related SQL statistics.",
+                    counter::value
+            );
+
+            nextMilestone = new LongGauge(
+                    METRIC_NEXT_MILESTONE,
+                    "The value of the next milestone for the number of 
partition modifications. "
+                            + "This value is used to determine staleness of 
the related SQL statistics.",
+                    counter::nextMilestone
+            );
+
+            lastMilestoneTimestamp = new LongGauge(
+                    METRIC_LAST_MILESTONE_TIMESTAMP,
+                    "The timestamp value representing the commit time of the 
last modification operation that "
+                            + "reached the milestone. This value is used to 
determine staleness of the related SQL statistics.",
+                    () -> counter.lastMilestoneTimestamp().longValue()
+            );
+
+            metrics = List.of(counterValue, nextMilestone, 
lastMilestoneTimestamp);
+        }
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return metrics;
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 313a151da87..a3a652a1cac 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -534,6 +534,15 @@ public class StorageUpdateHandler {
         return indexUpdateHandler;
     }
 
+    /**
+     * Returns the total number of unresolved write intents across all 
transactions.
+     *
+     * @return Total number of pending row IDs.
+     */
+    public long getPendingRowCount() {
+        return pendingRows.getPendingRowCount();
+    }
+
     /**
      * Performs add of the committed row version. If a write intent is 
detected on the first attempt and {@code lastCommitTs} is not
      * {@code null}, it will be cleared before the second attempt. Otherwise, 
{@link StorageException} will be thrown.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a29febd91d8..e19a1b5abaa 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -72,6 +72,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -111,7 +112,6 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
-import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
@@ -178,6 +178,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -363,8 +364,10 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     private final EventListener<ChangeLowWatermarkEventParameters> 
onLowWatermarkChangedListener = this::onLwmChanged;
 
     private final MetricManager metricManager;
+
     private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
-    private final Map<TablePartitionId, 
PartitionModificationCounterMetricSource> partModCounterMetricSources = new 
ConcurrentHashMap<>();
+    private final Map<TablePartitionId, PartitionTableStatsMetricSource> 
partModCounterMetricSources = new ConcurrentHashMap<>();
+    private final Map<TablePartitionId, LongSupplier> 
pendingWriteIntentsSuppliers = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
@@ -520,6 +523,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         return inBusyLockAsync(busyLock, () -> {
+            TransactionMetricsSource transactionMetricsSource = 
txManager.transactionMetricsSource();
+            
transactionMetricsSource.setPendingWriteIntentsSupplier(this::totalPendingWriteIntents);
+
             mvGc.start();
 
             fullStateTransferIndexChooser.start();
@@ -1108,6 +1114,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         if (!stopGuard.compareAndSet(false, true)) {
             return nullCompletedFuture();
         }
+        TransactionMetricsSource transactionMetricsSource = 
txManager.transactionMetricsSource();
+        if (transactionMetricsSource != null) {
+            transactionMetricsSource.setPendingWriteIntentsSupplier(null);
+        }
 
         
partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_DESTROYED, 
onZoneReplicaDestroyedListener);
         partitionReplicaLifecycleManager.removeListener(AFTER_REPLICA_STOPPED, 
onZoneReplicaStoppedListener);
@@ -1603,7 +1613,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         // In case of colocation there shouldn't be any table replica and thus 
it shouldn't be stopped.
         minTimeCollectorService.removePartition(tablePartitionId);
 
-        PartitionModificationCounterMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
+        PartitionTableStatsMetricSource metricSource = 
partModCounterMetricSources.remove(tablePartitionId);
+        pendingWriteIntentsSuppliers.remove(tablePartitionId);
         if (metricSource != null) {
             try {
                 metricManager.unregisterSource(metricSource);
@@ -1663,8 +1674,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         PartitionModificationCounter modificationCounter =
                 partitionModificationCounterFactory.create(partSizeSupplier, 
table::stalenessConfiguration, table.tableId(), partitionId);
 
-        registerPartitionModificationCounterMetrics(table, partitionId, 
modificationCounter);
-
         StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
                 partitionId,
                 partitionDataStorage,
@@ -1673,50 +1682,46 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 modificationCounter,
                 txManager
         );
+
         storageUpdateHandler.start(onNodeRecovery);
 
+        registerPartitionTableStatsMetrics(table, partitionId, 
modificationCounter);
+
+        pendingWriteIntentsSuppliers.put(new TablePartitionId(table.tableId(), 
partitionId), storageUpdateHandler::getPendingRowCount);
+
         return new PartitionUpdateHandlers(storageUpdateHandler, 
indexUpdateHandler, gcUpdateHandler);
     }
 
-    private void registerPartitionModificationCounterMetrics(
+    private void registerPartitionTableStatsMetrics(
             TableViewInternal table,
             int partitionId,
             PartitionModificationCounter counter
     ) {
-        PartitionModificationCounterMetricSource metricSource =
-                new PartitionModificationCounterMetricSource(table.tableId(), 
partitionId);
-
-        metricSource.addMetric(new LongGauge(
-                PartitionModificationCounterMetricSource.METRIC_COUNTER,
-                "The value of the volatile counter of partition modifications. 
"
-                        + "This value is used to determine staleness of the 
related SQL statistics.",
-                counter::value
-        ));
-
-        metricSource.addMetric(new LongGauge(
-                PartitionModificationCounterMetricSource.METRIC_NEXT_MILESTONE,
-                "The value of the next milestone for the number of partition 
modifications. "
-                        + "This value is used to determine staleness of the 
related SQL statistics.",
-                counter::nextMilestone
-        ));
-
-        metricSource.addMetric(new LongGauge(
-                
PartitionModificationCounterMetricSource.METRIC_LAST_MILESTONE_TIMESTAMP,
-                "The timestamp value representing the commit time of the last 
modification operation that "
-                        + "reached the milestone. This value is used to 
determine staleness of the related SQL statistics.",
-                () -> counter.lastMilestoneTimestamp().longValue()
-        ));
+        PartitionTableStatsMetricSource metricSource =
+                new PartitionTableStatsMetricSource(table.tableId(), 
partitionId, counter);
 
         try {
             metricManager.registerSource(metricSource);
             metricManager.enable(metricSource);
 
-            partModCounterMetricSources.put(new 
TablePartitionId(table.tableId(), partitionId), metricSource);
+            TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), partitionId);
+
+            partModCounterMetricSources.put(tablePartitionId, metricSource);
         } catch (Exception e) {
             LOG.warn("Failed to register metrics source for table [name={}, 
partitionId={}].", e, table.name(), partitionId);
         }
     }
 
+    private long totalPendingWriteIntents() {
+        long sum = 0;
+
+        for (LongSupplier supplier : pendingWriteIntentsSuppliers.values()) {
+            sum += supplier.getAsLong();
+        }
+
+        return sum;
+    }
+
     /**
      * Returns a cached table instance if it exists, {@code null} otherwise. 
Can return a table that is being stopped.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
index 110d45144de..889f3eaa24f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
@@ -88,4 +88,15 @@ public class PendingRows {
         return pendingRows == null ? EMPTY_SET : pendingRows;
     }
 
+    /**
+     * Returns the total number of unresolved write intents across all 
transactions.
+     *
+     * @return Total number of pending row IDs.
+     */
+    public long getPendingRowCount() {
+        return txsPendingRowIds.values().stream()
+                .mapToLong(Set::size)
+                .sum();
+    }
+
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 25c1309e1c3..b6fbc264fcb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -159,6 +159,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkAddress;
@@ -427,6 +428,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         when(clusterService.topologyService()).thenReturn(topologyService);
         when(topologyService.localMember()).thenReturn(node);
         when(distributionZoneManager.dataNodes(any(), anyInt(), 
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
+        
when(txManager.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
 
         PlacementDriver placementDriver = new TestPlacementDriver(node);
         ClockService clockService = new TestClockService(clock);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index b4d367fbbcd..a3068cd7042 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.IgniteSql;
@@ -265,6 +266,8 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
 
+        
when(tm.transactionMetricsSource()).thenReturn(mock(TransactionMetricsSource.class));
+
         tblManagerFut = new CompletableFuture<>();
 
         mockMetastore();
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
index b3f4499b17a..17891a7ba16 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -46,6 +47,7 @@ import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -407,4 +409,69 @@ public class ItTransactionMetricsTest extends 
ClusterPerClassIntegrationTest {
         assertThat(actualMetrics0.get("RoCommits"), 
is(metrics0.get("RoCommits") + 1));
         assertThat(actualMetrics0.get("RwCommits"), 
is(metrics0.get("RwCommits") + 1));
     }
+
+    @Test
+    void globalPendingWriteIntentsMetric() throws Exception {
+        String zoneName = "zone_single_partition_no_replicas_tx_metrics";
+
+        String table1 = "test_table_pending_wi_1";
+        String table2 = "test_table_pending_wi_2";
+
+        sql("CREATE ZONE " + zoneName + " (PARTITIONS 1, REPLICAS 1) storage 
profiles ['default']");
+
+        sql("CREATE TABLE " + table1 + "(id INT PRIMARY KEY, val INT) ZONE " + 
zoneName);
+        sql("CREATE TABLE " + table2 + "(id INT PRIMARY KEY, val INT) ZONE " + 
zoneName);
+
+        Transaction tx = node(0).transactions().begin();
+
+        int table1Inserts = 3;
+        int table2Inserts = 5;
+
+        try {
+            for (int i = 0; i < table1Inserts; i++) {
+                sql(tx, "INSERT INTO " + table1 + " VALUES(?, ?)", i, i);
+            }
+
+            for (int i = 0; i < table2Inserts; i++) {
+                sql(tx, "INSERT INTO " + table2 + " VALUES(?, ?)", i, i);
+            }
+
+            Awaitility.await()
+                    .atMost(Duration.ofSeconds(10))
+                    .untilAsserted(() -> 
assertThat(totalPendingWriteIntents(), is((long) table1Inserts + 
table2Inserts)));
+        } finally {
+            tx.commit();
+        }
+
+        Awaitility.await()
+                .atMost(Duration.ofSeconds(10))
+                .untilAsserted(() -> assertThat(totalPendingWriteIntents(), 
is(0L)));
+    }
+
+    private static long totalPendingWriteIntents() {
+        long sum = 0;
+
+        for (int i = 0; i < CLUSTER.nodes().size(); i++) {
+            sum += pendingWriteIntentsOnNode(i);
+        }
+
+        return sum;
+    }
+
+    private static long pendingWriteIntentsOnNode(int nodeIdx) {
+        MetricSet metrics = unwrapIgniteImpl(node(nodeIdx))
+                .metricManager()
+                .metricSnapshot()
+                .metrics()
+                .get(TransactionMetricsSource.SOURCE_NAME);
+
+        assertThat("Transaction metrics must be present on node " + nodeIdx, 
metrics != null, is(true));
+
+        LongMetric metric = 
metrics.get(TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS);
+
+        assertThat("Metric must be present: "
+                + TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS, 
metric != null, is(true));
+
+        return metric.value();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index bab8f783c8e..49c9d151b41 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
 import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
+import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -36,6 +37,11 @@ import org.jetbrains.annotations.TestOnly;
  * A transaction manager.
  */
 public interface TxManager extends IgniteComponent {
+    /**
+     * Returns transaction metrics source.
+     */
+    TransactionMetricsSource transactionMetricsSource();
+
     /**
      * Starts an implicit read-write transaction coordinated by a local node.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 913cd44ec04..d169255c381 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -411,6 +411,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         txMetrics = new TransactionMetricsSource(clockService);
     }
 
+    @Override
+    public TransactionMetricsSource transactionMetricsSource() {
+        return txMetrics;
+    }
+
     private CompletableFuture<Boolean> primaryReplicaEventListener(
             PrimaryReplicaEventParameters eventParameters,
             Consumer<ZonePartitionId> action
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
index 84d5b6d9de5..34723e35b78 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/metrics/TransactionMetricsSource.java
@@ -21,18 +21,23 @@ import static 
org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.function.LongSupplier;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.metrics.AbstractMetricSource;
 import org.apache.ignite.internal.metrics.DistributionMetric;
 import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.Holder;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Transaction metric source, that contains a set of transaction metrics.
  **/
-public class TransactionMetricsSource extends AbstractMetricSource<Holder> {
+public class TransactionMetricsSource extends 
AbstractMetricSource<TransactionMetricsSource.Holder> {
+    /** Metric name for unresolved (uncommitted) write intents. */
+    public static final String METRIC_PENDING_WRITE_INTENTS = 
"PendingWriteIntents";
+
     /** Histogram buckets for duration metrics in milliseconds. */
     private static final long[] HISTOGRAM_BUCKETS =
             {1, 2, 4, 8, 16, 25, 50, 75, 100, 250, 500, 750, 1000, 3000, 5000, 
10000, 25000, 60000};
@@ -43,6 +48,8 @@ public class TransactionMetricsSource extends 
AbstractMetricSource<Holder> {
     /** Clock service to calculate a timestamp for rolled back transactions. */
     private final ClockService clockService;
 
+    private volatile LongSupplier pendingWriteIntentsSupplier = () -> 0L;
+
     /**
      * Creates a new instance of {@link TransactionMetricsSource}.
      */
@@ -52,6 +59,15 @@ public class TransactionMetricsSource extends 
AbstractMetricSource<Holder> {
         this.clockService = clockService;
     }
 
+    /**
+     * Sets a supplier of the total number of unresolved (uncommitted) write 
intents local to this node.
+     *
+     * @param supplier Supplier, or {@code null} to reset to default (always 
returns {@code 0}).
+     */
+    public void setPendingWriteIntentsSupplier(@Nullable LongSupplier 
supplier) {
+        pendingWriteIntentsSupplier = supplier == null ? () -> 0L : supplier;
+    }
+
     /**
      * Updates read-write related metrics.
      *
@@ -145,7 +161,7 @@ public class TransactionMetricsSource extends 
AbstractMetricSource<Holder> {
 
     @Override
     protected Holder createHolder() {
-        return new Holder();
+        return new Holder(() -> pendingWriteIntentsSupplier.getAsLong());
     }
 
     private long calculateTransactionDuration(UUID transactionId) {
@@ -193,15 +209,29 @@ public class TransactionMetricsSource extends 
AbstractMetricSource<Holder> {
                 "Active",
                 "Number of running transactions.");
 
-        private final List<Metric> metrics = List.of(
-                totalCommits,
-                rwCommits,
-                roCommits,
-                totalRollbacks,
-                rwRollbacks,
-                roRollbacks,
-                rwDuration,
-                roDuration);
+        private final LongGauge pendingWriteIntents;
+
+        private final List<Metric> metrics;
+
+        private Holder(LongSupplier pendingWriteIntentsSupplier) {
+            pendingWriteIntents = new LongGauge(
+                    METRIC_PENDING_WRITE_INTENTS,
+                    "Total number of unresolved (uncommitted) write intents 
across all local partitions on this node.",
+                    pendingWriteIntentsSupplier
+            );
+
+            metrics = List.of(
+                    totalCommits,
+                    rwCommits,
+                    roCommits,
+                    totalRollbacks,
+                    rwRollbacks,
+                    roRollbacks,
+                    rwDuration,
+                    roDuration,
+                    pendingWriteIntents
+            );
+        }
 
         @Override
         public Iterable<Metric> metrics() {
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
index c480405d20c..5075e713dbc 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/metrics/TransactionMetricSourceTest.java
@@ -61,7 +61,8 @@ public class TransactionMetricSourceTest extends 
BaseIgniteAbstractTest {
                 "RoCommits",
                 "RoRollbacks",
                 "RwDuration",
-                "RoDuration");
+                "RoDuration",
+                "PendingWriteIntents");
 
         var actualMetrics = new HashSet<String>();
         set.forEach(m -> actualMetrics.add(m.name()));


Reply via email to