This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26f4e67810 IGNITE-22619 Add `InternalTable#estimatedSize` method 
(#4065)
26f4e67810 is described below

commit 26f4e67810570f7168cdaf32f9c12dc0aeb8c55a
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Jul 16 11:54:16 2024 +0300

    IGNITE-22619 Add `InternalTable#estimatedSize` method (#4065)
---
 .../ignite/client/fakes/FakeInternalTable.java     |   5 +
 .../impl/StandaloneMetaStorageManager.java         |   4 +-
 .../network/PartitionReplicationMessageGroup.java  |   6 +
 .../replication/GetEstimatedSizeRequest.java       |  29 ++
 modules/table/build.gradle                         |   1 +
 .../ignite/internal/table/ItEstimatedSizeTest.java | 182 ++++++++
 .../ignite/internal/table/InternalTable.java       |  15 +-
 .../replicator/PartitionReplicaListener.java       |   9 +
 .../distributed/storage/InternalTableImpl.java     | 233 ++++++----
 .../storage/InternalTableEstimatedSizeTest.java    | 474 +++++++++++++++++++++
 10 files changed, 873 insertions(+), 85 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 6d17191759..5ffdd22528 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -506,4 +506,9 @@ public class FakeInternalTable implements InternalTable {
         return completedFuture(
                 new ClusterNodeImpl("server-1", "server-1", new 
NetworkAddress("localhost", 10800)));
     }
+
+    @Override
+    public CompletableFuture<Long> estimatedSize() {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
 }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 408119039a..92623da88d 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -151,11 +151,11 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
     }
 
     private static ClusterService mockClusterService() {
-        ClusterService clusterService = mock(ClusterService.class);
+        ClusterService clusterService = mock(ClusterService.class, 
LENIENT_SETTINGS);
 
         when(clusterService.nodeName()).thenReturn(TEST_NODE_NAME);
 
-        TopologyService topologyService = mock(TopologyService.class);
+        TopologyService topologyService = mock(TopologyService.class, 
LENIENT_SETTINGS);
         when(topologyService.localMember()).thenReturn(new 
ClusterNodeImpl(TEST_NODE_ID, TEST_NODE_NAME, mock(NetworkAddress.class)));
 
         when(clusterService.topologyService()).thenReturn(topologyService);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 05ef537b46..40e4987556 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
@@ -185,6 +186,11 @@ public interface PartitionReplicationMessageGroup {
      */
     short TIMED_BINARY_ROW_MESSAGE = 24;
 
+    /**
+     * Message type for {@link GetEstimatedSizeRequest}.
+     */
+    short GET_ESTIMATED_SIZE_MESSAGE = 25;
+
     /**
      * Message types for partition replicator module RAFT commands.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
new file mode 100644
index 0000000000..1beb1a2704
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.replication;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+
+/**
+ * Request for getting an estimated size of a partition.
+ */
+@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE)
+public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest {
+}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 5807d57196..ba33cc2bed 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -64,6 +64,7 @@ dependencies {
     testImplementation project(':ignite-page-memory')
     testImplementation project(':ignite-storage-rocksdb')
     testImplementation project(':ignite-placement-driver-api')
+    testImplementation project(':ignite-placement-driver')
     testImplementation project(':ignite-system-view-api')
     testImplementation project(':ignite-failure-handler')
     testImplementation(testFixtures(project(':ignite-api')))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
new file mode 100644
index 0000000000..ed53068a3c
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link InternalTable#estimatedSize()} method.
+ *
+ * <p>This class doesn't use the Parameterized Test approach in order to 
reduce the number of created clusters.
+ */
+public class ItEstimatedSizeTest extends ClusterPerTestIntegrationTest {
+    private static final String TEST_ZONE_NAME = "TestZone";
+
+    private static final String TEST_TABLE_NAME_PREFIX = "Test_";
+
+    private static final long NUM_ROWS = 100;
+
+    private static final String[] ALL_STORAGE_PROFILES = {
+            DEFAULT_TEST_PROFILE_NAME,
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22616
+            // DEFAULT_AIPERSIST_PROFILE_NAME,
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22616
+            // DEFAULT_AIMEM_PROFILE_NAME,
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-22617
+            // DEFAULT_ROCKSDB_PROFILE_NAME
+    };
+
+    @BeforeEach
+    void setUp() {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH "
+                        + "REPLICAS=%d, "
+                        + "DATA_NODES_AUTO_ADJUST_SCALE_UP=%d, "
+                        + "DATA_NODES_AUTO_ADJUST_SCALE_DOWN=%d, "
+                        + "STORAGE_PROFILES='%s'",
+                TEST_ZONE_NAME,
+                initialNodes(),
+                IMMEDIATE_TIMER_VALUE,
+                IMMEDIATE_TIMER_VALUE,
+                String.join(", ", ALL_STORAGE_PROFILES)
+        ));
+    }
+
+    @Test
+    void testEstimatedSize() {
+        for (String profile : ALL_STORAGE_PROFILES) {
+            String tableName = createTableWithData(profile);
+
+            assertThat(tableSize(tableName), willBe(NUM_ROWS));
+
+            for (int i = 0; i < NUM_ROWS / 2; i++) {
+                executeSql(String.format("DELETE FROM %s WHERE key = %d", 
tableName, i));
+            }
+
+            assertThat(tableSize(tableName), willBe(NUM_ROWS / 2));
+
+            executeSql(String.format("DELETE FROM %s", tableName));
+
+            assertThat(tableSize(tableName), willBe(0L));
+        }
+    }
+
+    @Test
+    void testEstimatedSizeAfterScaleUp() throws InterruptedException {
+        for (String profile : ALL_STORAGE_PROFILES) {
+            String tableName = createTableWithData(profile);
+
+            assertThat(tableSize(tableName), willBe(NUM_ROWS));
+        }
+
+        cluster.startNode(initialNodes());
+
+        for (String profile : ALL_STORAGE_PROFILES) {
+            waitForRebalance(initialNodes() + 1);
+
+            assertThat(tableSize(tableName(profile)), willBe(NUM_ROWS));
+        }
+    }
+
+    @Test
+    void testEstimatedAfterScaleDown() throws InterruptedException {
+        for (String profile : ALL_STORAGE_PROFILES) {
+            String tableName = createTableWithData(profile);
+
+            assertThat(tableSize(tableName), willBe(NUM_ROWS));
+        }
+
+        cluster.stopNode(initialNodes() - 1);
+
+        for (String profile : ALL_STORAGE_PROFILES) {
+            waitForRebalance(initialNodes() - 1);
+
+            assertThat(tableSize(tableName(profile)), willBe(NUM_ROWS));
+        }
+    }
+
+    private String createTableWithData(String profile) {
+        String tableName = tableName(profile);
+
+        executeSql(String.format(
+                "CREATE TABLE %s (key INT PRIMARY KEY) WITH PRIMARY_ZONE=%s, 
STORAGE_PROFILE='%s'",
+                tableName,
+                TEST_ZONE_NAME,
+                profile
+        ));
+
+        for (int i = 0; i < NUM_ROWS; i++) {
+            executeSql(String.format("INSERT INTO %s VALUES (%d)", tableName, 
i));
+        }
+
+        return tableName;
+    }
+
+    private static String tableName(String profile) {
+        return TEST_TABLE_NAME_PREFIX + profile;
+    }
+
+    private CompletableFuture<Long> tableSize(String tableName) {
+        return tableViewInternal(tableName).internalTable().estimatedSize();
+    }
+
+    private void waitForRebalance(int numNodes) throws InterruptedException {
+        MetaStorageManager metaStorageManager = 
cluster.aliveNode().metaStorageManager();
+
+        var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+
+        assertTrue(waitForCondition(() -> {
+            CompletableFuture<List<Entry>> entriesFuture = 
subscribeToList(metaStorageManager.prefix(stableAssignmentsPrefix));
+
+            assertThat(entriesFuture, willCompleteSuccessfully());
+
+            long assignedNodesNum = entriesFuture.join().stream()
+                    .map(entry -> Assignments.fromBytes(entry.value()))
+                    .filter(Objects::nonNull)
+                    .flatMap(assignments -> assignments.nodes().stream())
+                    .distinct()
+                    .count();
+
+            return assignedNodesNum == numNodes;
+        }, 10_000));
+    }
+
+    private TableViewInternal tableViewInternal(String tableName) {
+        return 
unwrapTableViewInternal(cluster.aliveNode().tables().table(tableName));
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 47d90df7af..cdbf6c6361 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -89,7 +89,6 @@ public interface InternalTable extends ManuallyCloseable {
      * @param keyRow Row with key columns set.
      * @param tx     The transaction.
      * @return Future representing pending completion of the operation.
-     * @throws LockException If a lock can't be acquired by some reason.
      */
     CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx);
 
@@ -100,7 +99,6 @@ public interface InternalTable extends ManuallyCloseable {
      * @param readTimestamp Read timestamp.
      * @param recipientNode Cluster node that will handle given get request.
      * @return Future representing pending completion of the operation.
-     * @throws LockException If a lock can't be acquired by some reason.
      */
     CompletableFuture<BinaryRow> get(
             BinaryRowEx keyRow,
@@ -494,4 +492,15 @@ public interface InternalTable extends ManuallyCloseable {
      * @return Cluster node with primary replica.
      */
     CompletableFuture<ClusterNode> partitionLocation(TablePartitionId 
partitionId);
+
+    /**
+     * Returns the <em>estimated size</em> of this table.
+     *
+     * <p>It is computed as a sum of estimated sizes of all partitions of this 
table.
+     *
+     * @return Estimated size of this table.
+     *
+     * @see MvPartitionStorage#estimatedSize
+     */
+    CompletableFuture<Long> estimatedSize();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fcffbd1560..3e5c67b2f3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -111,6 +111,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
@@ -507,6 +508,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return processCleanupRecoveryMessage((TxCleanupRecoveryRequest) 
request);
         }
 
+        if (request instanceof GetEstimatedSizeRequest) {
+            return processGetEstimatedSizeRequest();
+        }
+
         HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? clockService.now() : null;
 
         return validateTableExistence(request, opTsIfDirectRo)
@@ -516,6 +521,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         processOperationRequestWithTxRwCounter(senderId, 
request, isPrimary, opTsIfDirectRo, leaseStartTime));
     }
 
+    private CompletableFuture<Long> processGetEstimatedSizeRequest() {
+        return completedFuture(mvDataStorage.estimatedSize());
+    }
+
     private CompletableFuture<Void> 
processCleanupRecoveryMessage(TxCleanupRecoveryRequest request) {
         runPersistentStorageScan();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index bbcdced0f3..c9b433700e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.storage;
 
 import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -61,6 +62,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashSet;
@@ -86,6 +88,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgnitePentaFunction;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.MultipleRowPkReplicaRequest;
@@ -104,6 +107,9 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
@@ -122,6 +128,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -574,7 +581,7 @@ public class InternalTableImpl implements InternalTable {
         return postEnlist(fut, false, tx, false);
     }
 
-    private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable 
BinaryTupleReader binaryTuple) {
+    private static @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable 
BinaryTupleReader binaryTuple) {
         if (binaryTuple == null) {
             return null;
         }
@@ -762,30 +769,24 @@ public class InternalTableImpl implements InternalTable {
 
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
-                tablePartitionId,
-                tx.startTimestamp(),
-                AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                SECONDS
-        );
-
-        CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
-            try {
-                ClusterNode node = 
getClusterNode(primaryReplica.getLeaseholder());
-
-                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
-            } catch (Throwable e) {
-                throw new TransactionException(
-                        INTERNAL_ERR,
-                        format(
-                                "Failed to invoke the replica request 
[tableName={}, partId={}].",
-                                tableName,
-                                partId
-                        ),
-                        e
-                );
-            }
-        });
+        CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId, 
tx.startTimestamp())
+                .thenCompose(primaryReplica -> {
+                    try {
+                        ClusterNode node = getClusterNode(primaryReplica);
+
+                        return replicaSvc.invoke(node, 
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
+                    } catch (Throwable e) {
+                        throw new TransactionException(
+                                INTERNAL_ERR,
+                                format(
+                                        "Failed to invoke the replica request 
[tableName={}, partId={}].",
+                                        tableName,
+                                        partId
+                                ),
+                                e
+                        );
+                    }
+                });
 
         return postEvaluate(fut, tx);
     }
@@ -808,30 +809,24 @@ public class InternalTableImpl implements InternalTable {
 
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
-                tablePartitionId,
-                tx.startTimestamp(),
-                AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                SECONDS
-        );
-
-        CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
-            try {
-                ClusterNode node = 
getClusterNode(primaryReplica.getLeaseholder());
-
-                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
-            } catch (Throwable e) {
-                throw new TransactionException(
-                        INTERNAL_ERR,
-                        format(
-                                "Failed to invoke the replica request 
[tableName={}, partId={}].",
-                                tableName,
-                                partId
-                        ),
-                        e
-                );
-            }
-        });
+        CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId, 
tx.startTimestamp())
+                .thenCompose(primaryReplica -> {
+                    try {
+                        ClusterNode node = getClusterNode(primaryReplica);
+
+                        return replicaSvc.invoke(node, 
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
+                    } catch (Throwable e) {
+                        throw new TransactionException(
+                                INTERNAL_ERR,
+                                format(
+                                        "Failed to invoke the replica request 
[tableName={}, partId={}].",
+                                        tableName,
+                                        partId
+                                ),
+                                e
+                        );
+                    }
+                });
 
         return postEvaluate(fut, tx);
     }
@@ -1074,7 +1069,7 @@ public class InternalTableImpl implements InternalTable {
         return result;
     }
 
-    private TablePartitionIdMessage serializeTablePartitionId(TablePartitionId 
id) {
+    private static TablePartitionIdMessage 
serializeTablePartitionId(TablePartitionId id) {
         return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, id);
     }
 
@@ -1918,43 +1913,41 @@ public class InternalTableImpl implements InternalTable 
{
             TablePartitionId partGroupId = new TablePartitionId(tableId, 
partId);
 
             return tx.enlist(partGroupId, new IgniteBiTuple<>(
-                    getClusterNode(meta.getLeaseholder()),
-                    meta.getStartTime().longValue())
+                    getClusterNode(meta),
+                    enlistmentConsistencyToken(meta))
             );
         });
     }
 
     @Override
     public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId 
tablePartitionId) {
-        return partitionMeta(tablePartitionId).thenApply(meta -> 
getClusterNode(meta.getLeaseholder()));
+        return partitionMeta(tablePartitionId).thenApply(this::getClusterNode);
     }
 
     private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId 
tablePartitionId) {
         HybridTimestamp now = clock.now();
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
-                tablePartitionId,
-                now,
-                AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                SECONDS
-        );
-
-        return primaryReplicaFuture.handle((primaryReplica, e) -> {
-            if (e != null) {
-                throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
-                        + " [tablePartitionId=" + tablePartitionId + ", 
awaitTimestamp=" + now + ']', e);
-            }
-
-            return primaryReplica;
-        });
+        return awaitPrimaryReplica(tablePartitionId, now)
+                .exceptionally(e -> {
+                    throw withCause(
+                            TransactionException::new,
+                            REPLICA_UNAVAILABLE_ERR,
+                            "Failed to get the primary replica 
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']',
+                            e
+                    );
+                });
     }
 
-    private ClusterNode getClusterNode(@Nullable String leaserHolder) {
-        ClusterNode node = clusterNodeResolver.getByConsistentId(leaserHolder);
+    private ClusterNode getClusterNode(ReplicaMeta replicaMeta) {
+        String leaseHolderId = replicaMeta.getLeaseholderId();
+
+        ClusterNode node = leaseHolderId == null ? null : 
clusterNodeResolver.getById(leaseHolderId);
 
         if (node == null) {
-            throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to 
resolve the primary replica node [consistentId="
-                    + leaserHolder + ']');
+            throw new TransactionException(
+                    REPLICA_UNAVAILABLE_ERR,
+                    String.format("Failed to resolve the primary replica node 
[id=%s]", leaseHolderId)
+            );
         }
 
         return node;
@@ -2077,9 +2070,8 @@ public class InternalTableImpl implements InternalTable {
              *
              * @param t An exception which was thrown when entries were 
retrieving from the cursor.
              * @param intentionallyClose True if the subscription is closed 
for the client side.
-             * @return Future to complete.
              */
-            private void cancel(Throwable t, boolean intentionallyClose) {
+            private void cancel(@Nullable Throwable t, boolean 
intentionallyClose) {
                 if (!canceled.compareAndSet(false, true)) {
                     return;
                 }
@@ -2202,7 +2194,7 @@ public class InternalTableImpl implements InternalTable {
     protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        return placementDriver.awaitPrimaryReplica(tablePartitionId, 
clock.now(), AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+        return awaitPrimaryReplica(tablePartitionId, clock.now())
                 .handle((res, e) -> {
                     if (e != null) {
                         throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, e);
@@ -2210,7 +2202,7 @@ public class InternalTableImpl implements InternalTable {
                         if (res == null) {
                             throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, e);
                         } else {
-                            return getClusterNode(res.getLeaseholder());
+                            return getClusterNode(res);
                         }
                     }
                 });
@@ -2231,6 +2223,78 @@ public class InternalTableImpl implements InternalTable {
         return streamerFlushExecutor.get();
     }
 
+    @Override
+    public CompletableFuture<Long> estimatedSize() {
+        HybridTimestamp now = clock.now();
+
+        var invokeFutures = new CompletableFuture<?>[partitions];
+
+        for (int partId = 0; partId < partitions; partId++) {
+            var replicaGroupId = new TablePartitionId(tableId, partId);
+
+            TablePartitionIdMessage partitionIdMessage = 
serializeTablePartitionId(replicaGroupId);
+
+            Function<ReplicaMeta, ReplicaRequest> requestFactory = replicaMeta 
->
+                    TABLE_MESSAGES_FACTORY.getEstimatedSizeRequest()
+                            .groupId(partitionIdMessage)
+                            
.enlistmentConsistencyToken(enlistmentConsistencyToken(replicaMeta))
+                            .build();
+
+            invokeFutures[partId] = sendToPrimaryWithRetry(replicaGroupId, 
now, 5, requestFactory);
+        }
+
+        return allOf(invokeFutures)
+                .thenApply(v -> Arrays.stream(invokeFutures).mapToLong(f -> 
(Long) f.join()).sum());
+    }
+
+    private <T> CompletableFuture<T> sendToPrimaryWithRetry(
+            TablePartitionId tablePartitionId,
+            HybridTimestamp hybridTimestamp,
+            int numRetries,
+            Function<ReplicaMeta, ReplicaRequest> requestFactory
+    ) {
+        return awaitPrimaryReplica(tablePartitionId, hybridTimestamp)
+                .thenCompose(replicaMeta -> {
+                    String leaseHolderName = replicaMeta.getLeaseholder();
+
+                    assert leaseHolderName != null;
+
+                    return replicaSvc.<T>invoke(leaseHolderName, 
requestFactory.apply(replicaMeta))
+                            .handle((response, e) -> {
+                                if (e == null) {
+                                    return completedFuture(response);
+                                }
+
+                                if (e instanceof ReplicationException && 
e.getCause() != null) {
+                                    e = e.getCause();
+                                }
+
+                                // We do a retry for the following conditions:
+                                // 1. Primary Replica has changed between the 
"awaitPrimaryReplica" and "invoke" calls;
+                                // 2. Primary Replica has died and is no 
longer available.
+                                // In both cases, we need to wait for the 
lease to expire and to get a new Primary Replica.
+                                if (e instanceof PrimaryReplicaMissException
+                                        || e instanceof 
UnresolvableConsistentIdException
+                                        || e instanceof 
ReplicationTimeoutException) {
+                                    if (numRetries == 0) {
+                                        throw new 
IgniteException(REPLICA_MISS_ERR, e);
+                                    }
+
+                                    // Primary Replica has changed, need to 
wait for the new replica to appear.
+                                    return this.<T>sendToPrimaryWithRetry(
+                                            tablePartitionId,
+                                            
replicaMeta.getExpirationTime().tick(),
+                                            numRetries - 1,
+                                            requestFactory
+                                    );
+                                } else {
+                                    throw new IgniteException(INTERNAL_ERR, e);
+                                }
+                            });
+                })
+                .thenCompose(identity());
+    }
+
     /**
      * Updates the partition trackers, if there were previous ones, it closes 
them.
      *
@@ -2277,8 +2341,6 @@ public class InternalTableImpl implements InternalTable {
             Long enlistmentConsistencyToken,
             boolean full
     ) {
-        assert serializeTablePartitionId(txo.commitPartition()) != null;
-
         return readWriteMultiRowReplicaRequest(RW_UPSERT_ALL, keyRows0, null, 
txo, groupId, enlistmentConsistencyToken, full);
     }
 
@@ -2290,8 +2352,6 @@ public class InternalTableImpl implements InternalTable {
             Long enlistmentConsistencyToken,
             boolean full
     ) {
-        assert serializeTablePartitionId(txo.commitPartition()) != null;
-
         return readWriteMultiRowReplicaRequest(
                 RW_UPSERT_ALL, keyRows0, deleted, txo, groupId, 
enlistmentConsistencyToken, full);
     }
@@ -2305,4 +2365,17 @@ public class InternalTableImpl implements InternalTable {
     private static boolean exceptionAllowsImplicitTxRetry(Throwable e) {
         return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR);
     }
+
+    private CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(TablePartitionId tablePartitionId, HybridTimestamp 
timestamp) {
+        return placementDriver.awaitPrimaryReplica(
+                tablePartitionId,
+                timestamp,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
+        );
+    }
+
+    private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
+        return replicaMeta.getStartTime().longValue();
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
new file mode 100644
index 0000000000..db2c904828
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for distributed aspects of the {@link InternalTable#estimatedSize} 
method.
+ */
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class InternalTableEstimatedSizeTest extends BaseIgniteAbstractTest {
+    private static final String TABLE_NAME = "TEST";
+
+    private static final int TABLE_ID = 1;
+
+    private static final int PARTITIONS_NUM = 3;
+
+    private static final List<TablePartitionId> TABLE_PARTITION_IDS = 
IntStream.range(0, PARTITIONS_NUM)
+            .mapToObj(i -> new TablePartitionId(TABLE_ID, i))
+            .collect(toList());
+
+    private ClusterNode node;
+
+    private InternalTableImpl table;
+
+    private MessagingService messagingService;
+
+    @Mock
+    private PlacementDriver placementDriver;
+
+    private final HybridClockImpl clock = new HybridClockImpl();
+
+    private final ComponentContext componentContext = new ComponentContext();
+
+    private final List<MvPartitionStorage> partitionStorages = new 
ArrayList<>(PARTITIONS_NUM);
+
+    private final List<IgniteComponent> components = new ArrayList<>();
+
+    @BeforeEach
+    void setUp(
+            TestInfo testInfo,
+            @Mock TxManager txManager,
+            @Mock LockManager lockManager,
+            @Mock MvTableStorage tableStorage,
+            @Mock TxStateTableStorage txStateTableStorage,
+            @Mock TxStateStorage txStateStorage,
+            @Mock TableRaftServiceImpl tableRaftService,
+            @Mock TransactionStateResolver transactionStateResolver,
+            @Mock StorageUpdateHandler storageUpdateHandler,
+            @Mock ValidationSchemasSource validationSchemasSource,
+            @Mock SchemaSyncService schemaSyncService,
+            @Mock CatalogService catalogService,
+            @Mock RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
+            @Mock SchemaRegistry schemaRegistry,
+            @Mock IndexMetaStorage indexMetaStorage,
+            @InjectConfiguration ReplicationConfiguration 
replicationConfiguration
+    ) {
+        String nodeName = testNodeName(testInfo, 0);
+
+        var addr = new NetworkAddress("localhost", 10_000);
+
+        ClusterService clusterService = spy(clusterService(nodeName, 
addr.port(), new StaticNodeFinder(List.of(addr))));
+
+        messagingService = spy(clusterService.messagingService());
+
+        when(clusterService.messagingService()).thenReturn(messagingService);
+
+        components.add(clusterService);
+
+        var clockWaiter = new ClockWaiter(nodeName, clock);
+
+        components.add(clockWaiter);
+
+        MetaStorageManager metaStorageManager = 
StandaloneMetaStorageManager.create(
+                new SimpleInMemoryKeyValueStorage(nodeName),
+                clock
+        );
+
+        components.add(metaStorageManager);
+
+        assertThat(startAsync(componentContext, components), 
willCompleteSuccessfully());
+
+        assertThat(metaStorageManager.deployWatches(), 
willCompleteSuccessfully());
+
+        node = clusterService.topologyService().localMember();
+
+        var clockService = new ClockServiceImpl(clock, clockWaiter, () -> 0);
+
+        table = new InternalTableImpl(
+                TABLE_NAME,
+                TABLE_ID,
+                PARTITIONS_NUM,
+                clusterService.topologyService(),
+                txManager,
+                tableStorage,
+                txStateTableStorage,
+                new ReplicaService(clusterService.messagingService(), clock, 
replicationConfiguration),
+                clock,
+                new HybridTimestampTracker(),
+                placementDriver,
+                tableRaftService,
+                new TransactionInflights(placementDriver, clockService),
+                0,
+                0,
+                () -> null
+        );
+
+        List<PartitionReplicaListener> partitionReplicaListeners = 
IntStream.range(0, PARTITIONS_NUM)
+                .mapToObj(partId -> createPartitionReplicaListener(
+                        partId,
+                        txManager,
+                        lockManager,
+                        clockService,
+                        txStateStorage,
+                        transactionStateResolver,
+                        storageUpdateHandler,
+                        validationSchemasSource,
+                        node,
+                        schemaSyncService,
+                        catalogService,
+                        placementDriver,
+                        clusterService.topologyService(),
+                        remotelyTriggeredResourceRegistry,
+                        schemaRegistry,
+                        indexMetaStorage
+                ))
+                .collect(toList());
+
+        lenient().doAnswer(invocation -> {
+            ReplicaRequest request = invocation.getArgument(1);
+
+            var tablePartitionId = (TablePartitionId) 
request.groupId().asReplicationGroupId();
+
+            return 
partitionReplicaListeners.get(tablePartitionId.partitionId())
+                    .invoke(request, node.id())
+                    .thenApply(replicaResult -> new ReplicaMessagesFactory()
+                            .replicaResponse()
+                            .result(replicaResult.result())
+                            .build()
+                    );
+        }).when(messagingService).invoke(eq(nodeName), 
any(ReplicaRequest.class), anyLong());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Collections.reverse(components);
+
+        closeAll(components.stream().map(c -> c::beforeNodeStop));
+
+        assertThat(stopAsync(componentContext, components), 
willCompleteSuccessfully());
+    }
+
+    private PartitionReplicaListener createPartitionReplicaListener(
+            int partId,
+            TxManager txManager,
+            LockManager lockManager,
+            ClockService clockService,
+            TxStateStorage txStateStorage,
+            TransactionStateResolver transactionStateResolver,
+            StorageUpdateHandler storageUpdateHandler,
+            ValidationSchemasSource validationSchemasSource,
+            ClusterNode node,
+            SchemaSyncService schemaSyncService,
+            CatalogService catalogService,
+            PlacementDriver placementDriver,
+            ClusterNodeResolver clusterNodeResolver,
+            RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
+            SchemaRegistry schemaRegistry,
+            IndexMetaStorage indexMetaStorage
+    ) {
+        MvPartitionStorage partitionStorage = mock(MvPartitionStorage.class);
+
+        partitionStorages.add(partitionStorage);
+
+        return new PartitionReplicaListener(
+                partitionStorage,
+                new RaftCommandRunner() {
+                    @Override
+                    public <R> CompletableFuture<R> run(Command cmd) {
+                        return nullCompletedFuture();
+                    }
+                },
+                txManager,
+                lockManager,
+                ForkJoinPool.commonPool(),
+                partId,
+                TABLE_ID,
+                Map::of,
+                new Lazy<>(() -> null),
+                Map::of,
+                clockService,
+                new 
PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE),
+                txStateStorage,
+                transactionStateResolver,
+                storageUpdateHandler,
+                validationSchemasSource,
+                node,
+                schemaSyncService,
+                catalogService,
+                placementDriver,
+                clusterNodeResolver,
+                remotelyTriggeredResourceRegistry,
+                schemaRegistry,
+                indexMetaStorage
+        );
+    }
+
+    /**
+     * Validates that {@link InternalTable#estimatedSize} works correctly 
using the test setup.
+     */
+    @Test
+    void testHappyCase() {
+        HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+        HybridTimestamp expireTime = HybridTimestamp.MAX_VALUE;
+
+        TABLE_PARTITION_IDS.forEach(groupId -> {
+            var replicaMeta = new Lease(node.name(), node.id(), startTime, 
expireTime, groupId);
+
+            when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                    .thenReturn(completedFuture(replicaMeta));
+            when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                    .thenReturn(completedFuture(replicaMeta));
+        });
+
+        validateEstimatedSize();
+
+        // One request per partition.
+        verify(messagingService, times(3)).invoke(anyString(), 
any(ReplicaRequest.class), anyLong());
+    }
+
+    /**
+     * Tests that a retry will happen when a Primary Replica lease expires 
during the call to {@link InternalTable#estimatedSize}.
+     */
+    @Test
+    void testRetryOnReplicaMiss() {
+        HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+        HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+        for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+            TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+            var replicaMeta = new Lease(node.name(), node.id(), startTime, 
expireTime, groupId);
+
+            // Emulate lease expiration on the first replica 
(getPrimaryReplica will return null). We then expect that
+            // a second request will be sent with a different timestamp.
+            if (i == 0) {
+                var newReplicaMeta = new Lease(node.name(), node.id(), 
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), 
eq(expireTime.tick()), anyLong(), any()))
+                        .thenReturn(completedFuture(newReplicaMeta));
+
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(nullCompletedFuture())
+                        .thenReturn(completedFuture(newReplicaMeta));
+            } else {
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+            }
+        }
+
+        validateEstimatedSize();
+
+        // One request per partition + one retry for the first partition.
+        verify(messagingService, times(4)).invoke(anyString(), 
any(ReplicaRequest.class), anyLong());
+    }
+
+    /**
+     * Tests that a retry will happen when the Primary Replica dies during the 
call to {@link InternalTable#estimatedSize}.
+     */
+    @Test
+    void testRetryOnReplicaUnavailable() {
+        HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+        HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+        for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+            TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+            var replicaMeta = new Lease(node.name(), node.id(), startTime, 
expireTime, groupId);
+
+            // Emulate Primary Replica death by issuing a lease for a 
non-existent node. We then expect that a retry to the correct node
+            // will be issued.
+            if (i == 0) {
+                var fakeReplicaMeta = new Lease("Dead node name", "Dead node 
id", HybridTimestamp.MIN_VALUE, expireTime, groupId);
+
+                var newReplicaMeta = new Lease(node.name(), node.id(), 
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(fakeReplicaMeta));
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), 
eq(expireTime.tick()), anyLong(), any()))
+                        .thenReturn(completedFuture(newReplicaMeta));
+
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(completedFuture(newReplicaMeta));
+            } else {
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+            }
+        }
+
+        validateEstimatedSize();
+
+        // One request per partition + one retry for the first partition.
+        verify(messagingService, times(4)).invoke(anyString(), 
any(ReplicaRequest.class), anyLong());
+    }
+
+    /**
+     * Tests that a retry will happen when the Primary Replica becomes 
unavailable during the call to {@link InternalTable#estimatedSize}.
+     */
+    @Test
+    void testRetryOnTimeout() {
+        HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+        HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+        for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+            TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+            var replicaMeta = new Lease(node.name(), node.id(), startTime, 
expireTime, groupId);
+
+            // Emulate a network timeout, we use a fake lease to not override 
an existing mocking on the messagingService.
+            if (i == 0) {
+                var fakeReplicaMeta = new Lease("Dead node name", "Dead node 
id", HybridTimestamp.MIN_VALUE, expireTime, groupId);
+
+                var newReplicaMeta = new Lease(node.name(), node.id(), 
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+                doReturn(failedFuture(new TimeoutException()))
+                        
.when(messagingService).invoke(eq(fakeReplicaMeta.getLeaseholder()), any(), 
anyLong());
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(fakeReplicaMeta));
+
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), 
eq(expireTime.tick()), anyLong(), any()))
+                        .thenReturn(completedFuture(newReplicaMeta));
+
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(completedFuture(newReplicaMeta));
+            } else {
+                when(placementDriver.awaitPrimaryReplica(eq(groupId), any(), 
anyLong(), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+                when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+                        .thenReturn(completedFuture(replicaMeta));
+            }
+        }
+
+        validateEstimatedSize();
+
+        // One request per partition + one retry for the first partition.
+        verify(messagingService, times(4)).invoke(anyString(), 
any(ReplicaRequest.class), anyLong());
+    }
+
+    private void validateEstimatedSize() {
+        long expectedSum = 0;
+
+        for (MvPartitionStorage partitionStorage : partitionStorages) {
+            long size = ThreadLocalRandom.current().nextLong(100);
+
+            when(partitionStorage.estimatedSize()).thenReturn(size);
+
+            expectedSum += size;
+        }
+
+        assertThat(table.estimatedSize(), willBe(expectedSum));
+    }
+}

Reply via email to