This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26f4e67810 IGNITE-22619 Add `InternalTable#estimatedSize` method
(#4065)
26f4e67810 is described below
commit 26f4e67810570f7168cdaf32f9c12dc0aeb8c55a
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Jul 16 11:54:16 2024 +0300
IGNITE-22619 Add `InternalTable#estimatedSize` method (#4065)
---
.../ignite/client/fakes/FakeInternalTable.java | 5 +
.../impl/StandaloneMetaStorageManager.java | 4 +-
.../network/PartitionReplicationMessageGroup.java | 6 +
.../replication/GetEstimatedSizeRequest.java | 29 ++
modules/table/build.gradle | 1 +
.../ignite/internal/table/ItEstimatedSizeTest.java | 182 ++++++++
.../ignite/internal/table/InternalTable.java | 15 +-
.../replicator/PartitionReplicaListener.java | 9 +
.../distributed/storage/InternalTableImpl.java | 233 ++++++----
.../storage/InternalTableEstimatedSizeTest.java | 474 +++++++++++++++++++++
10 files changed, 873 insertions(+), 85 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 6d17191759..5ffdd22528 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -506,4 +506,9 @@ public class FakeInternalTable implements InternalTable {
return completedFuture(
new ClusterNodeImpl("server-1", "server-1", new
NetworkAddress("localhost", 10800)));
}
+
+ @Override
+ public CompletableFuture<Long> estimatedSize() {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 408119039a..92623da88d 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -151,11 +151,11 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
}
private static ClusterService mockClusterService() {
- ClusterService clusterService = mock(ClusterService.class);
+ ClusterService clusterService = mock(ClusterService.class,
LENIENT_SETTINGS);
when(clusterService.nodeName()).thenReturn(TEST_NODE_NAME);
- TopologyService topologyService = mock(TopologyService.class);
+ TopologyService topologyService = mock(TopologyService.class,
LENIENT_SETTINGS);
when(topologyService.localMember()).thenReturn(new
ClusterNodeImpl(TEST_NODE_ID, TEST_NODE_NAME, mock(NetworkAddress.class)));
when(clusterService.topologyService()).thenReturn(topologyService);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 05ef537b46..40e4987556 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
@@ -185,6 +186,11 @@ public interface PartitionReplicationMessageGroup {
*/
short TIMED_BINARY_ROW_MESSAGE = 24;
+ /**
+ * Message type for {@link GetEstimatedSizeRequest}.
+ */
+ short GET_ESTIMATED_SIZE_MESSAGE = 25;
+
/**
* Message types for partition replicator module RAFT commands.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
new file mode 100644
index 0000000000..1beb1a2704
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.replication;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+
+/**
+ * Request for getting an estimated size of a partition.
+ */
+@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE)
+public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest {
+}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 5807d57196..ba33cc2bed 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -64,6 +64,7 @@ dependencies {
testImplementation project(':ignite-page-memory')
testImplementation project(':ignite-storage-rocksdb')
testImplementation project(':ignite-placement-driver-api')
+ testImplementation project(':ignite-placement-driver')
testImplementation project(':ignite-system-view-api')
testImplementation project(':ignite-failure-handler')
testImplementation(testFixtures(project(':ignite-api')))
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
new file mode 100644
index 0000000000..ed53068a3c
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItEstimatedSizeTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link InternalTable#estimatedSize()} method.
+ *
+ * <p>This class doesn't use the Parameterized Test approach in order to
reduce the number of created clusters.
+ */
+public class ItEstimatedSizeTest extends ClusterPerTestIntegrationTest {
+ private static final String TEST_ZONE_NAME = "TestZone";
+
+ private static final String TEST_TABLE_NAME_PREFIX = "Test_";
+
+ private static final long NUM_ROWS = 100;
+
+ private static final String[] ALL_STORAGE_PROFILES = {
+ DEFAULT_TEST_PROFILE_NAME,
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22616
+ // DEFAULT_AIPERSIST_PROFILE_NAME,
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22616
+ // DEFAULT_AIMEM_PROFILE_NAME,
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22617
+ // DEFAULT_ROCKSDB_PROFILE_NAME
+ };
+
+ @BeforeEach
+ void setUp() {
+ executeSql(String.format(
+ "CREATE ZONE %s WITH "
+ + "REPLICAS=%d, "
+ + "DATA_NODES_AUTO_ADJUST_SCALE_UP=%d, "
+ + "DATA_NODES_AUTO_ADJUST_SCALE_DOWN=%d, "
+ + "STORAGE_PROFILES='%s'",
+ TEST_ZONE_NAME,
+ initialNodes(),
+ IMMEDIATE_TIMER_VALUE,
+ IMMEDIATE_TIMER_VALUE,
+ String.join(", ", ALL_STORAGE_PROFILES)
+ ));
+ }
+
+ @Test
+ void testEstimatedSize() {
+ for (String profile : ALL_STORAGE_PROFILES) {
+ String tableName = createTableWithData(profile);
+
+ assertThat(tableSize(tableName), willBe(NUM_ROWS));
+
+ for (int i = 0; i < NUM_ROWS / 2; i++) {
+ executeSql(String.format("DELETE FROM %s WHERE key = %d",
tableName, i));
+ }
+
+ assertThat(tableSize(tableName), willBe(NUM_ROWS / 2));
+
+ executeSql(String.format("DELETE FROM %s", tableName));
+
+ assertThat(tableSize(tableName), willBe(0L));
+ }
+ }
+
+ @Test
+ void testEstimatedSizeAfterScaleUp() throws InterruptedException {
+ for (String profile : ALL_STORAGE_PROFILES) {
+ String tableName = createTableWithData(profile);
+
+ assertThat(tableSize(tableName), willBe(NUM_ROWS));
+ }
+
+ cluster.startNode(initialNodes());
+
+ for (String profile : ALL_STORAGE_PROFILES) {
+ waitForRebalance(initialNodes() + 1);
+
+ assertThat(tableSize(tableName(profile)), willBe(NUM_ROWS));
+ }
+ }
+
+ @Test
+ void testEstimatedAfterScaleDown() throws InterruptedException {
+ for (String profile : ALL_STORAGE_PROFILES) {
+ String tableName = createTableWithData(profile);
+
+ assertThat(tableSize(tableName), willBe(NUM_ROWS));
+ }
+
+ cluster.stopNode(initialNodes() - 1);
+
+ for (String profile : ALL_STORAGE_PROFILES) {
+ waitForRebalance(initialNodes() - 1);
+
+ assertThat(tableSize(tableName(profile)), willBe(NUM_ROWS));
+ }
+ }
+
+ private String createTableWithData(String profile) {
+ String tableName = tableName(profile);
+
+ executeSql(String.format(
+ "CREATE TABLE %s (key INT PRIMARY KEY) WITH PRIMARY_ZONE=%s,
STORAGE_PROFILE='%s'",
+ tableName,
+ TEST_ZONE_NAME,
+ profile
+ ));
+
+ for (int i = 0; i < NUM_ROWS; i++) {
+ executeSql(String.format("INSERT INTO %s VALUES (%d)", tableName,
i));
+ }
+
+ return tableName;
+ }
+
+ private static String tableName(String profile) {
+ return TEST_TABLE_NAME_PREFIX + profile;
+ }
+
+ private CompletableFuture<Long> tableSize(String tableName) {
+ return tableViewInternal(tableName).internalTable().estimatedSize();
+ }
+
+ private void waitForRebalance(int numNodes) throws InterruptedException {
+ MetaStorageManager metaStorageManager =
cluster.aliveNode().metaStorageManager();
+
+ var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+
+ assertTrue(waitForCondition(() -> {
+ CompletableFuture<List<Entry>> entriesFuture =
subscribeToList(metaStorageManager.prefix(stableAssignmentsPrefix));
+
+ assertThat(entriesFuture, willCompleteSuccessfully());
+
+ long assignedNodesNum = entriesFuture.join().stream()
+ .map(entry -> Assignments.fromBytes(entry.value()))
+ .filter(Objects::nonNull)
+ .flatMap(assignments -> assignments.nodes().stream())
+ .distinct()
+ .count();
+
+ return assignedNodesNum == numNodes;
+ }, 10_000));
+ }
+
+ private TableViewInternal tableViewInternal(String tableName) {
+ return
unwrapTableViewInternal(cluster.aliveNode().tables().table(tableName));
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 47d90df7af..cdbf6c6361 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -89,7 +89,6 @@ public interface InternalTable extends ManuallyCloseable {
* @param keyRow Row with key columns set.
* @param tx The transaction.
* @return Future representing pending completion of the operation.
- * @throws LockException If a lock can't be acquired by some reason.
*/
CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable
InternalTransaction tx);
@@ -100,7 +99,6 @@ public interface InternalTable extends ManuallyCloseable {
* @param readTimestamp Read timestamp.
* @param recipientNode Cluster node that will handle given get request.
* @return Future representing pending completion of the operation.
- * @throws LockException If a lock can't be acquired by some reason.
*/
CompletableFuture<BinaryRow> get(
BinaryRowEx keyRow,
@@ -494,4 +492,15 @@ public interface InternalTable extends ManuallyCloseable {
* @return Cluster node with primary replica.
*/
CompletableFuture<ClusterNode> partitionLocation(TablePartitionId
partitionId);
+
+ /**
+ * Returns the <em>estimated size</em> of this table.
+ *
+ * <p>It is computed as a sum of estimated sizes of all partitions of this
table.
+ *
+ * @return Estimated size of this table.
+ *
+ * @see MvPartitionStorage#estimatedSize
+ */
+ CompletableFuture<Long> estimatedSize();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fcffbd1560..3e5c67b2f3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -111,6 +111,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.WriteInte
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
@@ -507,6 +508,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
return processCleanupRecoveryMessage((TxCleanupRecoveryRequest)
request);
}
+ if (request instanceof GetEstimatedSizeRequest) {
+ return processGetEstimatedSizeRequest();
+ }
+
HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? clockService.now() : null;
return validateTableExistence(request, opTsIfDirectRo)
@@ -516,6 +521,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
processOperationRequestWithTxRwCounter(senderId,
request, isPrimary, opTsIfDirectRo, leaseStartTime));
}
+ private CompletableFuture<Long> processGetEstimatedSizeRequest() {
+ return completedFuture(mvDataStorage.estimatedSize());
+ }
+
private CompletableFuture<Void>
processCleanupRecoveryMessage(TxCleanupRecoveryRequest request) {
runPersistentStorageScan();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index bbcdced0f3..c9b433700e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.storage;
import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -61,6 +62,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
@@ -86,6 +88,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgnitePentaFunction;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.MultipleRowPkReplicaRequest;
@@ -104,6 +107,9 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
@@ -122,6 +128,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -574,7 +581,7 @@ public class InternalTableImpl implements InternalTable {
return postEnlist(fut, false, tx, false);
}
- private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable
BinaryTupleReader binaryTuple) {
+ private static @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable
BinaryTupleReader binaryTuple) {
if (binaryTuple == null) {
return null;
}
@@ -762,30 +769,24 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
- tablePartitionId,
- tx.startTimestamp(),
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
- try {
- ClusterNode node =
getClusterNode(primaryReplica.getLeaseholder());
-
- return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
- } catch (Throwable e) {
- throw new TransactionException(
- INTERNAL_ERR,
- format(
- "Failed to invoke the replica request
[tableName={}, partId={}].",
- tableName,
- partId
- ),
- e
- );
- }
- });
+ CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId,
tx.startTimestamp())
+ .thenCompose(primaryReplica -> {
+ try {
+ ClusterNode node = getClusterNode(primaryReplica);
+
+ return replicaSvc.invoke(node,
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
+ } catch (Throwable e) {
+ throw new TransactionException(
+ INTERNAL_ERR,
+ format(
+ "Failed to invoke the replica request
[tableName={}, partId={}].",
+ tableName,
+ partId
+ ),
+ e
+ );
+ }
+ });
return postEvaluate(fut, tx);
}
@@ -808,30 +809,24 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
- tablePartitionId,
- tx.startTimestamp(),
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
- try {
- ClusterNode node =
getClusterNode(primaryReplica.getLeaseholder());
-
- return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
- } catch (Throwable e) {
- throw new TransactionException(
- INTERNAL_ERR,
- format(
- "Failed to invoke the replica request
[tableName={}, partId={}].",
- tableName,
- partId
- ),
- e
- );
- }
- });
+ CompletableFuture<R> fut = awaitPrimaryReplica(tablePartitionId,
tx.startTimestamp())
+ .thenCompose(primaryReplica -> {
+ try {
+ ClusterNode node = getClusterNode(primaryReplica);
+
+ return replicaSvc.invoke(node,
op.apply(tablePartitionId, enlistmentConsistencyToken(primaryReplica)));
+ } catch (Throwable e) {
+ throw new TransactionException(
+ INTERNAL_ERR,
+ format(
+ "Failed to invoke the replica request
[tableName={}, partId={}].",
+ tableName,
+ partId
+ ),
+ e
+ );
+ }
+ });
return postEvaluate(fut, tx);
}
@@ -1074,7 +1069,7 @@ public class InternalTableImpl implements InternalTable {
return result;
}
- private TablePartitionIdMessage serializeTablePartitionId(TablePartitionId
id) {
+ private static TablePartitionIdMessage
serializeTablePartitionId(TablePartitionId id) {
return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, id);
}
@@ -1918,43 +1913,41 @@ public class InternalTableImpl implements InternalTable
{
TablePartitionId partGroupId = new TablePartitionId(tableId,
partId);
return tx.enlist(partGroupId, new IgniteBiTuple<>(
- getClusterNode(meta.getLeaseholder()),
- meta.getStartTime().longValue())
+ getClusterNode(meta),
+ enlistmentConsistencyToken(meta))
);
});
}
@Override
public CompletableFuture<ClusterNode> partitionLocation(TablePartitionId
tablePartitionId) {
- return partitionMeta(tablePartitionId).thenApply(meta ->
getClusterNode(meta.getLeaseholder()));
+ return partitionMeta(tablePartitionId).thenApply(this::getClusterNode);
}
private CompletableFuture<ReplicaMeta> partitionMeta(TablePartitionId
tablePartitionId) {
HybridTimestamp now = clock.now();
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
- tablePartitionId,
- now,
- AWAIT_PRIMARY_REPLICA_TIMEOUT,
- SECONDS
- );
-
- return primaryReplicaFuture.handle((primaryReplica, e) -> {
- if (e != null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
- + " [tablePartitionId=" + tablePartitionId + ",
awaitTimestamp=" + now + ']', e);
- }
-
- return primaryReplica;
- });
+ return awaitPrimaryReplica(tablePartitionId, now)
+ .exceptionally(e -> {
+ throw withCause(
+ TransactionException::new,
+ REPLICA_UNAVAILABLE_ERR,
+ "Failed to get the primary replica
[tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']',
+ e
+ );
+ });
}
- private ClusterNode getClusterNode(@Nullable String leaserHolder) {
- ClusterNode node = clusterNodeResolver.getByConsistentId(leaserHolder);
+ private ClusterNode getClusterNode(ReplicaMeta replicaMeta) {
+ String leaseHolderId = replicaMeta.getLeaseholderId();
+
+ ClusterNode node = leaseHolderId == null ? null :
clusterNodeResolver.getById(leaseHolderId);
if (node == null) {
- throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to
resolve the primary replica node [consistentId="
- + leaserHolder + ']');
+ throw new TransactionException(
+ REPLICA_UNAVAILABLE_ERR,
+ String.format("Failed to resolve the primary replica node
[id=%s]", leaseHolderId)
+ );
}
return node;
@@ -2077,9 +2070,8 @@ public class InternalTableImpl implements InternalTable {
*
* @param t An exception which was thrown when entries were
retrieving from the cursor.
* @param intentionallyClose True if the subscription is closed
for the client side.
- * @return Future to complete.
*/
- private void cancel(Throwable t, boolean intentionallyClose) {
+ private void cancel(@Nullable Throwable t, boolean
intentionallyClose) {
if (!canceled.compareAndSet(false, true)) {
return;
}
@@ -2202,7 +2194,7 @@ public class InternalTableImpl implements InternalTable {
protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- return placementDriver.awaitPrimaryReplica(tablePartitionId,
clock.now(), AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+ return awaitPrimaryReplica(tablePartitionId, clock.now())
.handle((res, e) -> {
if (e != null) {
throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
@@ -2210,7 +2202,7 @@ public class InternalTableImpl implements InternalTable {
if (res == null) {
throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
} else {
- return getClusterNode(res.getLeaseholder());
+ return getClusterNode(res);
}
}
});
@@ -2231,6 +2223,78 @@ public class InternalTableImpl implements InternalTable {
return streamerFlushExecutor.get();
}
+ @Override
+ public CompletableFuture<Long> estimatedSize() {
+ HybridTimestamp now = clock.now();
+
+ var invokeFutures = new CompletableFuture<?>[partitions];
+
+ for (int partId = 0; partId < partitions; partId++) {
+ var replicaGroupId = new TablePartitionId(tableId, partId);
+
+ TablePartitionIdMessage partitionIdMessage =
serializeTablePartitionId(replicaGroupId);
+
+ Function<ReplicaMeta, ReplicaRequest> requestFactory = replicaMeta
->
+ TABLE_MESSAGES_FACTORY.getEstimatedSizeRequest()
+ .groupId(partitionIdMessage)
+
.enlistmentConsistencyToken(enlistmentConsistencyToken(replicaMeta))
+ .build();
+
+ invokeFutures[partId] = sendToPrimaryWithRetry(replicaGroupId,
now, 5, requestFactory);
+ }
+
+ return allOf(invokeFutures)
+ .thenApply(v -> Arrays.stream(invokeFutures).mapToLong(f ->
(Long) f.join()).sum());
+ }
+
+ private <T> CompletableFuture<T> sendToPrimaryWithRetry(
+ TablePartitionId tablePartitionId,
+ HybridTimestamp hybridTimestamp,
+ int numRetries,
+ Function<ReplicaMeta, ReplicaRequest> requestFactory
+ ) {
+ return awaitPrimaryReplica(tablePartitionId, hybridTimestamp)
+ .thenCompose(replicaMeta -> {
+ String leaseHolderName = replicaMeta.getLeaseholder();
+
+ assert leaseHolderName != null;
+
+ return replicaSvc.<T>invoke(leaseHolderName,
requestFactory.apply(replicaMeta))
+ .handle((response, e) -> {
+ if (e == null) {
+ return completedFuture(response);
+ }
+
+ if (e instanceof ReplicationException &&
e.getCause() != null) {
+ e = e.getCause();
+ }
+
+ // We do a retry for the following conditions:
+ // 1. Primary Replica has changed between the
"awaitPrimaryReplica" and "invoke" calls;
+ // 2. Primary Replica has died and is no
longer available.
+ // In both cases, we need to wait for the
lease to expire and to get a new Primary Replica.
+ if (e instanceof PrimaryReplicaMissException
+ || e instanceof
UnresolvableConsistentIdException
+ || e instanceof
ReplicationTimeoutException) {
+ if (numRetries == 0) {
+ throw new
IgniteException(REPLICA_MISS_ERR, e);
+ }
+
+ // Primary Replica has changed, need to
wait for the new replica to appear.
+ return this.<T>sendToPrimaryWithRetry(
+ tablePartitionId,
+
replicaMeta.getExpirationTime().tick(),
+ numRetries - 1,
+ requestFactory
+ );
+ } else {
+ throw new IgniteException(INTERNAL_ERR, e);
+ }
+ });
+ })
+ .thenCompose(identity());
+ }
+
/**
* Updates the partition trackers, if there were previous ones, it closes
them.
*
@@ -2277,8 +2341,6 @@ public class InternalTableImpl implements InternalTable {
Long enlistmentConsistencyToken,
boolean full
) {
- assert serializeTablePartitionId(txo.commitPartition()) != null;
-
return readWriteMultiRowReplicaRequest(RW_UPSERT_ALL, keyRows0, null,
txo, groupId, enlistmentConsistencyToken, full);
}
@@ -2290,8 +2352,6 @@ public class InternalTableImpl implements InternalTable {
Long enlistmentConsistencyToken,
boolean full
) {
- assert serializeTablePartitionId(txo.commitPartition()) != null;
-
return readWriteMultiRowReplicaRequest(
RW_UPSERT_ALL, keyRows0, deleted, txo, groupId,
enlistmentConsistencyToken, full);
}
@@ -2305,4 +2365,17 @@ public class InternalTableImpl implements InternalTable {
private static boolean exceptionAllowsImplicitTxRetry(Throwable e) {
return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR);
}
+
+ private CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(TablePartitionId tablePartitionId, HybridTimestamp
timestamp) {
+ return placementDriver.awaitPrimaryReplica(
+ tablePartitionId,
+ timestamp,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+ }
+
+ private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
+ return replicaMeta.getStartTime().longValue();
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
new file mode 100644
index 0000000000..db2c904828
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for distributed aspects of the {@link InternalTable#estimatedSize}
method.
+ */
+@ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class InternalTableEstimatedSizeTest extends BaseIgniteAbstractTest {
+ private static final String TABLE_NAME = "TEST";
+
+ private static final int TABLE_ID = 1;
+
+ private static final int PARTITIONS_NUM = 3;
+
+ private static final List<TablePartitionId> TABLE_PARTITION_IDS =
IntStream.range(0, PARTITIONS_NUM)
+ .mapToObj(i -> new TablePartitionId(TABLE_ID, i))
+ .collect(toList());
+
+ private ClusterNode node;
+
+ private InternalTableImpl table;
+
+ private MessagingService messagingService;
+
+ @Mock
+ private PlacementDriver placementDriver;
+
+ private final HybridClockImpl clock = new HybridClockImpl();
+
+ private final ComponentContext componentContext = new ComponentContext();
+
+ private final List<MvPartitionStorage> partitionStorages = new
ArrayList<>(PARTITIONS_NUM);
+
+ private final List<IgniteComponent> components = new ArrayList<>();
+
+ @BeforeEach
+ void setUp(
+ TestInfo testInfo,
+ @Mock TxManager txManager,
+ @Mock LockManager lockManager,
+ @Mock MvTableStorage tableStorage,
+ @Mock TxStateTableStorage txStateTableStorage,
+ @Mock TxStateStorage txStateStorage,
+ @Mock TableRaftServiceImpl tableRaftService,
+ @Mock TransactionStateResolver transactionStateResolver,
+ @Mock StorageUpdateHandler storageUpdateHandler,
+ @Mock ValidationSchemasSource validationSchemasSource,
+ @Mock SchemaSyncService schemaSyncService,
+ @Mock CatalogService catalogService,
+ @Mock RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
+ @Mock SchemaRegistry schemaRegistry,
+ @Mock IndexMetaStorage indexMetaStorage,
+ @InjectConfiguration ReplicationConfiguration
replicationConfiguration
+ ) {
+ String nodeName = testNodeName(testInfo, 0);
+
+ var addr = new NetworkAddress("localhost", 10_000);
+
+ ClusterService clusterService = spy(clusterService(nodeName,
addr.port(), new StaticNodeFinder(List.of(addr))));
+
+ messagingService = spy(clusterService.messagingService());
+
+ when(clusterService.messagingService()).thenReturn(messagingService);
+
+ components.add(clusterService);
+
+ var clockWaiter = new ClockWaiter(nodeName, clock);
+
+ components.add(clockWaiter);
+
+ MetaStorageManager metaStorageManager =
StandaloneMetaStorageManager.create(
+ new SimpleInMemoryKeyValueStorage(nodeName),
+ clock
+ );
+
+ components.add(metaStorageManager);
+
+ assertThat(startAsync(componentContext, components),
willCompleteSuccessfully());
+
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+
+ node = clusterService.topologyService().localMember();
+
+ var clockService = new ClockServiceImpl(clock, clockWaiter, () -> 0);
+
+ table = new InternalTableImpl(
+ TABLE_NAME,
+ TABLE_ID,
+ PARTITIONS_NUM,
+ clusterService.topologyService(),
+ txManager,
+ tableStorage,
+ txStateTableStorage,
+ new ReplicaService(clusterService.messagingService(), clock,
replicationConfiguration),
+ clock,
+ new HybridTimestampTracker(),
+ placementDriver,
+ tableRaftService,
+ new TransactionInflights(placementDriver, clockService),
+ 0,
+ 0,
+ () -> null
+ );
+
+ List<PartitionReplicaListener> partitionReplicaListeners =
IntStream.range(0, PARTITIONS_NUM)
+ .mapToObj(partId -> createPartitionReplicaListener(
+ partId,
+ txManager,
+ lockManager,
+ clockService,
+ txStateStorage,
+ transactionStateResolver,
+ storageUpdateHandler,
+ validationSchemasSource,
+ node,
+ schemaSyncService,
+ catalogService,
+ placementDriver,
+ clusterService.topologyService(),
+ remotelyTriggeredResourceRegistry,
+ schemaRegistry,
+ indexMetaStorage
+ ))
+ .collect(toList());
+
+ lenient().doAnswer(invocation -> {
+ ReplicaRequest request = invocation.getArgument(1);
+
+ var tablePartitionId = (TablePartitionId)
request.groupId().asReplicationGroupId();
+
+ return
partitionReplicaListeners.get(tablePartitionId.partitionId())
+ .invoke(request, node.id())
+ .thenApply(replicaResult -> new ReplicaMessagesFactory()
+ .replicaResponse()
+ .result(replicaResult.result())
+ .build()
+ );
+ }).when(messagingService).invoke(eq(nodeName),
any(ReplicaRequest.class), anyLong());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ Collections.reverse(components);
+
+ closeAll(components.stream().map(c -> c::beforeNodeStop));
+
+ assertThat(stopAsync(componentContext, components),
willCompleteSuccessfully());
+ }
+
+ private PartitionReplicaListener createPartitionReplicaListener(
+ int partId,
+ TxManager txManager,
+ LockManager lockManager,
+ ClockService clockService,
+ TxStateStorage txStateStorage,
+ TransactionStateResolver transactionStateResolver,
+ StorageUpdateHandler storageUpdateHandler,
+ ValidationSchemasSource validationSchemasSource,
+ ClusterNode node,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ PlacementDriver placementDriver,
+ ClusterNodeResolver clusterNodeResolver,
+ RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
+ SchemaRegistry schemaRegistry,
+ IndexMetaStorage indexMetaStorage
+ ) {
+ MvPartitionStorage partitionStorage = mock(MvPartitionStorage.class);
+
+ partitionStorages.add(partitionStorage);
+
+ return new PartitionReplicaListener(
+ partitionStorage,
+ new RaftCommandRunner() {
+ @Override
+ public <R> CompletableFuture<R> run(Command cmd) {
+ return nullCompletedFuture();
+ }
+ },
+ txManager,
+ lockManager,
+ ForkJoinPool.commonPool(),
+ partId,
+ TABLE_ID,
+ Map::of,
+ new Lazy<>(() -> null),
+ Map::of,
+ clockService,
+ new
PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE),
+ txStateStorage,
+ transactionStateResolver,
+ storageUpdateHandler,
+ validationSchemasSource,
+ node,
+ schemaSyncService,
+ catalogService,
+ placementDriver,
+ clusterNodeResolver,
+ remotelyTriggeredResourceRegistry,
+ schemaRegistry,
+ indexMetaStorage
+ );
+ }
+
+ /**
+ * Validates that {@link InternalTable#estimatedSize} works correctly
using the test setup.
+ */
+ @Test
+ void testHappyCase() {
+ HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+ HybridTimestamp expireTime = HybridTimestamp.MAX_VALUE;
+
+ TABLE_PARTITION_IDS.forEach(groupId -> {
+ var replicaMeta = new Lease(node.name(), node.id(), startTime,
expireTime, groupId);
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ });
+
+ validateEstimatedSize();
+
+ // One request per partition.
+ verify(messagingService, times(3)).invoke(anyString(),
any(ReplicaRequest.class), anyLong());
+ }
+
+ /**
+ * Tests that a retry will happen when a Primary Replica lease expires
during the call to {@link InternalTable#estimatedSize}.
+ */
+ @Test
+ void testRetryOnReplicaMiss() {
+ HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+ HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+ for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+ TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+ var replicaMeta = new Lease(node.name(), node.id(), startTime,
expireTime, groupId);
+
+ // Emulate lease expiration on the first replica
(getPrimaryReplica will return null). We then expect that
+ // a second request will be sent with a different timestamp.
+ if (i == 0) {
+ var newReplicaMeta = new Lease(node.name(), node.id(),
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId),
eq(expireTime.tick()), anyLong(), any()))
+ .thenReturn(completedFuture(newReplicaMeta));
+
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(nullCompletedFuture())
+ .thenReturn(completedFuture(newReplicaMeta));
+ } else {
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ }
+ }
+
+ validateEstimatedSize();
+
+ // One request per partition + one retry for the first partition.
+ verify(messagingService, times(4)).invoke(anyString(),
any(ReplicaRequest.class), anyLong());
+ }
+
+ /**
+ * Tests that a retry will happen when the Primary Replica dies during the
call to {@link InternalTable#estimatedSize}.
+ */
+ @Test
+ void testRetryOnReplicaUnavailable() {
+ HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+ HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+ for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+ TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+ var replicaMeta = new Lease(node.name(), node.id(), startTime,
expireTime, groupId);
+
+ // Emulate Primary Replica death by issuing a lease for a
non-existent node. We then expect that a retry to the correct node
+ // will be issued.
+ if (i == 0) {
+ var fakeReplicaMeta = new Lease("Dead node name", "Dead node
id", HybridTimestamp.MIN_VALUE, expireTime, groupId);
+
+ var newReplicaMeta = new Lease(node.name(), node.id(),
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(fakeReplicaMeta));
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId),
eq(expireTime.tick()), anyLong(), any()))
+ .thenReturn(completedFuture(newReplicaMeta));
+
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(newReplicaMeta));
+ } else {
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ }
+ }
+
+ validateEstimatedSize();
+
+ // One request per partition + one retry for the first partition.
+ verify(messagingService, times(4)).invoke(anyString(),
any(ReplicaRequest.class), anyLong());
+ }
+
+ /**
+ * Tests that a retry will happen when the Primary Replica becomes
unavailable during the call to {@link InternalTable#estimatedSize}.
+ */
+ @Test
+ void testRetryOnTimeout() {
+ HybridTimestamp startTime = HybridTimestamp.MIN_VALUE;
+ HybridTimestamp expireTime = clock.now().addPhysicalTime(10_000);
+
+ for (int i = 0; i < TABLE_PARTITION_IDS.size(); i++) {
+ TablePartitionId groupId = TABLE_PARTITION_IDS.get(i);
+
+ var replicaMeta = new Lease(node.name(), node.id(), startTime,
expireTime, groupId);
+
+ // Emulate a network timeout, we use a fake lease to not override
an existing mocking on the messagingService.
+ if (i == 0) {
+ var fakeReplicaMeta = new Lease("Dead node name", "Dead node
id", HybridTimestamp.MIN_VALUE, expireTime, groupId);
+
+ var newReplicaMeta = new Lease(node.name(), node.id(),
expireTime, HybridTimestamp.MAX_VALUE, groupId);
+
+ doReturn(failedFuture(new TimeoutException()))
+
.when(messagingService).invoke(eq(fakeReplicaMeta.getLeaseholder()), any(),
anyLong());
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(fakeReplicaMeta));
+
+ when(placementDriver.awaitPrimaryReplica(eq(groupId),
eq(expireTime.tick()), anyLong(), any()))
+ .thenReturn(completedFuture(newReplicaMeta));
+
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(newReplicaMeta));
+ } else {
+ when(placementDriver.awaitPrimaryReplica(eq(groupId), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ when(placementDriver.getPrimaryReplica(eq(groupId), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ }
+ }
+
+ validateEstimatedSize();
+
+ // One request per partition + one retry for the first partition.
+ verify(messagingService, times(4)).invoke(anyString(),
any(ReplicaRequest.class), anyLong());
+ }
+
+ private void validateEstimatedSize() {
+ long expectedSum = 0;
+
+ for (MvPartitionStorage partitionStorage : partitionStorages) {
+ long size = ThreadLocalRandom.current().nextLong(100);
+
+ when(partitionStorage.estimatedSize()).thenReturn(size);
+
+ expectedSum += size;
+ }
+
+ assertThat(table.estimatedSize(), willBe(expectedSum));
+ }
+}