This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b8c9fd07a86 IGNITE-27349 Execute necessary waits before index build
starts (#7273)
b8c9fd07a86 is described below
commit b8c9fd07a86a1ff7d184b4162b5a190018dc30ab
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 19 20:12:34 2025 +0400
IGNITE-27349 Execute necessary waits before index build starts (#7273)
---
.../index/ItIndexBuildCompletenessTest.java | 6 +-
.../internal/index/IndexBuildController.java | 64 +++++++++++++++++
.../ignite/internal/index/IndexBuildTask.java | 40 ++++++++---
.../apache/ignite/internal/index/IndexBuilder.java | 35 ++++++++--
.../internal/index/IndexBuildingManager.java | 3 +
.../index/IndexAvailabilityControllerTest.java | 15 ++++
.../internal/index/IndexBuildControllerTest.java | 48 +++++++++++--
.../ignite/internal/index/IndexBuilderTest.java | 61 ++++++++++++++--
.../partition/replicator/fixtures/Node.java | 1 +
.../PartitionReplicaLifecycleManager.java | 9 ++-
.../replicator/ReplicaTableProcessor.java | 9 +++
.../partition/replicator/ReplicaTableSegment.java | 46 ++++++++++++
...ocessor.java => TableTxRwOperationTracker.java} | 19 ++---
.../replicator/ZonePartitionReplicaListener.java | 6 ++
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../table/distributed/index/IndexMeta.java | 2 +-
.../IndexBuilderTxRwOperationTracker.java | 4 +-
.../replicator/PartitionReplicaListener.java | 17 +++--
.../handlers/BuildIndexReplicaRequestHandler.java | 27 +-------
.../replication/PartitionReplicaListenerTest.java | 81 +---------------------
20 files changed, 344 insertions(+), 150 deletions(-)
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
index 9080c90d0af..80f5dfce77a 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.tx.Transaction;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class ItIndexBuildCompletenessTest extends ClusterPerTestIntegrationTest {
@@ -82,11 +81,10 @@ class ItIndexBuildCompletenessTest extends
ClusterPerTestIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27349")
void
raceBetweenIndexBuildAndWriteFromDeadCoordinatorDoesNotCauseIndexIncompleteness()
{
createTestTable(cluster, 1, 1);
- List<Transaction> transactions = IntStream.range(0, 2)
+ List<Transaction> transactions = IntStream.range(0, 100)
.mapToObj(n -> cluster.node(0).transactions().begin())
.collect(toUnmodifiableList());
@@ -100,7 +98,7 @@ class ItIndexBuildCompletenessTest extends
ClusterPerTestIntegrationTest {
for (int i = 0; i < transactions.size(); i++) {
Transaction tx = transactions.get(i);
try {
- kvView.put(tx, i, 11);
+ kvView.put(tx, i, 42);
tx.commit();
} catch (RuntimeException e) {
if (ExceptionUtils.hasCause(e,
StaleTransactionOperationException.class)) {
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index e919d9457bd..14975611a9f 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -59,6 +59,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableSegment;
+import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -72,6 +76,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
/**
* Component is responsible for starting and stopping the building of indexes
on primary replicas.
@@ -103,6 +108,8 @@ class IndexBuildController implements ManuallyCloseable {
private final ClockService clockService;
+ private final PartitionReplicaLifecycleManager
partitionReplicaLifecycleManager;
+
private final FailureProcessor failureProcessor;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -119,6 +126,7 @@ class IndexBuildController implements ManuallyCloseable {
ClusterService clusterService,
PlacementDriver placementDriver,
ClockService clockService,
+ PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
FailureProcessor failureProcessor
) {
this.indexBuilder = indexBuilder;
@@ -127,6 +135,7 @@ class IndexBuildController implements ManuallyCloseable {
this.clusterService = clusterService;
this.placementDriver = placementDriver;
this.clockService = clockService;
+ this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.failureProcessor = failureProcessor;
}
@@ -409,6 +418,18 @@ class IndexBuildController implements ManuallyCloseable {
long enlistmentConsistencyToken,
HybridTimestamp initialOperationTimestamp
) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionId);
+ ZonePartitionResources resources =
partitionReplicaLifecycleManager.zonePartitionResourcesOrNull(zonePartitionId);
+ if (resources == null) {
+ // Already stopped/destroyed, ignore.
+ return;
+ }
+
+ @Nullable ReplicaTableSegment segment =
replicaTableSegment(zonePartitionId, tableId, resources, indexDescriptor);
+ if (segment == null) {
+ return;
+ }
+
MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage,
zoneId, tableId, partitionId);
IndexStorage indexStorage = indexStorage(mvTableStorage, partitionId,
indexDescriptor);
@@ -420,6 +441,8 @@ class IndexBuildController implements ManuallyCloseable {
indexDescriptor.id(),
indexStorage,
mvPartition,
+ segment.txRwOperationTracker(),
+ segment.safeTime(),
localNode(),
enlistmentConsistencyToken,
initialOperationTimestamp
@@ -435,6 +458,18 @@ class IndexBuildController implements ManuallyCloseable {
MvTableStorage mvTableStorage,
long enlistmentConsistencyToken
) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionId);
+ ZonePartitionResources resources =
partitionReplicaLifecycleManager.zonePartitionResourcesOrNull(zonePartitionId);
+ if (resources == null) {
+ // Already stopped/destroyed, ignore.
+ return;
+ }
+
+ @Nullable ReplicaTableSegment segment =
replicaTableSegment(zonePartitionId, tableId, resources, indexDescriptor);
+ if (segment == null) {
+ return;
+ }
+
MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage,
zoneId, tableId, partitionId);
IndexStorage indexStorage = indexStorage(mvTableStorage, partitionId,
indexDescriptor);
@@ -446,12 +481,41 @@ class IndexBuildController implements ManuallyCloseable {
indexDescriptor.id(),
indexStorage,
mvPartition,
+ segment.txRwOperationTracker(),
+ segment.safeTime(),
localNode(),
enlistmentConsistencyToken,
clockService.current()
);
}
+ private static @Nullable ReplicaTableSegment replicaTableSegment(
+ ZonePartitionId zonePartitionId,
+ int tableId,
+ ZonePartitionResources resources,
+ CatalogIndexDescriptor indexDescriptor
+ ) {
+ CompletableFuture<ZonePartitionReplicaListener> replicaListenerFuture
= resources.replicaListenerFuture();
+ assert replicaListenerFuture.isDone() : "Replica listener future is
not done for [zonePartitionId=" + zonePartitionId + "].";
+
+ ZonePartitionReplicaListener replicaListener =
replicaListenerFuture.join();
+ @Nullable ReplicaTableSegment segment =
replicaListener.segmentFor(tableId);
+
+ if (segment == null) {
+ // Null means that the table has been removed due to table
destruction.
+ LOG.info(
+ "Segment is null, skipping index build scheduling "
+ + "[zoneId={}, tableId={}, partitionId={},
indexId={}]",
+ zonePartitionId.zoneId(),
+ tableId,
+ zonePartitionId.partitionId(),
+ indexDescriptor.id()
+ );
+ }
+
+ return segment;
+ }
+
private InternalClusterNode localNode() {
return IndexManagementUtils.localNode(clusterService);
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 4cc94adabdf..0b64a024256 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableSet;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.raft.GroupOverloadedException;
@@ -62,10 +63,12 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;
@@ -80,7 +83,9 @@ class IndexBuildTask {
private final IndexBuildTaskId taskId;
- private final HybridTimestamp indexCreationActivationTs;
+ private final MetaIndexStatusChange indexCreationInfo;
+
+ private final HybridTimestamp indexBuildingStateActivationTimestamp;
private final IndexStorage indexStorage;
@@ -88,6 +93,10 @@ class IndexBuildTask {
private final ReplicaService replicaService;
+ private final TableTxRwOperationTracker txRwOperationTracker;
+
+ private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime;
+
private final FailureProcessor failureProcessor;
private final FinalTransactionStateResolver finalTransactionStateResolver;
@@ -120,10 +129,13 @@ class IndexBuildTask {
IndexBuildTask(
IndexBuildTaskId taskId,
- HybridTimestamp indexCreationActivationTs,
+ MetaIndexStatusChange indexCreationInfo,
+ HybridTimestamp indexBuildingStateActivationTimestamp,
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
ReplicaService replicaService,
+ TableTxRwOperationTracker txRwOperationTracker,
+ PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
FailureProcessor failureProcessor,
FinalTransactionStateResolver finalTransactionStateResolver,
Executor executor,
@@ -137,10 +149,13 @@ class IndexBuildTask {
IndexBuilderMetricSource indexBuilderMetricSource
) {
this.taskId = taskId;
- this.indexCreationActivationTs = indexCreationActivationTs;
+ this.indexCreationInfo = indexCreationInfo;
+ this.indexBuildingStateActivationTimestamp =
indexBuildingStateActivationTimestamp;
this.indexStorage = indexStorage;
this.partitionStorage = partitionStorage;
this.replicaService = replicaService;
+ this.txRwOperationTracker = txRwOperationTracker;
+ this.safeTime = safeTime;
this.failureProcessor = failureProcessor;
this.finalTransactionStateResolver = finalTransactionStateResolver;
this.executor = executor;
@@ -174,9 +189,16 @@ class IndexBuildTask {
}
try {
- statisticsLoggingListener.onIndexBuildStarted();
-
- supplyAsync(partitionStorage::highestRowId, executor)
+ // Before starting to build the index, we are waiting for all
operations of RW transactions that started before index creation
+ // to make sure that, even if some coordinator has gone while we
were waiting for its pre-index RW transactions to finish,
+ // we still allow operations of those transactions from that
coordinator which are still in-flight to finish, so that we
+ // index the row versions they could create. Otherwise, we might
miss some row versions in the index.
+
txRwOperationTracker.awaitCompleteTxRwOperations(indexCreationInfo.catalogVersion())
+ // This wait is necessary to make sure that all writes
made before the index has switched to the BUILDING state
+ // are visible to the index build process.
+ .thenCompose(unused ->
safeTime.waitFor(indexBuildingStateActivationTimestamp))
+ .thenRun(statisticsLoggingListener::onIndexBuildStarted)
+ .thenApplyAsync(unused -> partitionStorage.highestRowId(),
executor)
.thenApplyAsync(this::handleNextBatch, executor)
.thenCompose(Function.identity())
.whenComplete((unused, throwable) -> {
@@ -296,7 +318,9 @@ class IndexBuildTask {
assert transactionId != null;
// We only care about transactions which began after index
creation.
- if
(TransactionIds.beginTimestamp(transactionId).compareTo(indexCreationActivationTs)
< 0) {
+ HybridTimestamp txBeginTs =
TransactionIds.beginTimestamp(transactionId);
+ HybridTimestamp indexCreationTs =
hybridTimestamp(indexCreationInfo.activationTimestamp());
+ if (txBeginTs.compareTo(indexCreationTs) < 0) {
transactionsToResolve.put(
row.transactionId(),
new CommitPartitionId(row.commitZoneId(),
row.commitPartitionId())
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index 0049a34c440..1d3e8a4556d 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.index;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
import java.util.Iterator;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -43,7 +45,9 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
/**
* Component that is responsible for building an index for a specific
partition.
@@ -124,6 +128,8 @@ class IndexBuilder implements ManuallyCloseable {
* @param indexId Index ID.
* @param indexStorage Index storage to build.
* @param partitionStorage Multi-versioned partition storage.
+ * @param partitionTxRwOperationTracker Partition transaction read-write
operations tracker.
+ * @param partitionSafeTime Partition safe time tracker.
* @param node Node to which requests to build the index will be sent.
* @param enlistmentConsistencyToken Enlistment consistency token is used
to check that the lease is still actual while the message goes
* to the replica.
@@ -135,6 +141,8 @@ class IndexBuilder implements ManuallyCloseable {
int indexId,
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
+ TableTxRwOperationTracker partitionTxRwOperationTracker,
+ PendingComparableValuesTracker<HybridTimestamp, Void>
partitionSafeTime,
InternalClusterNode node,
long enlistmentConsistencyToken,
HybridTimestamp initialOperationTimestamp
@@ -152,10 +160,13 @@ class IndexBuilder implements ManuallyCloseable {
IndexBuildTask newTask = new IndexBuildTask(
taskId,
- indexCreationActivationTs(indexId),
+ indexCreationInfo(indexId),
+ indexBuildingStateActivationTimestamp(indexId),
indexStorage,
partitionStorage,
replicaService,
+ partitionTxRwOperationTracker,
+ partitionSafeTime,
failureProcessor,
finalTransactionStateResolver,
executor,
@@ -193,6 +204,8 @@ class IndexBuilder implements ManuallyCloseable {
* @param indexId Index ID.
* @param indexStorage Index storage to build.
* @param partitionStorage Multi-versioned partition storage.
+ * @param partitionTxRwOperationTracker Partition transaction read-write
operations tracker.
+ * @param partitionSafeTime Partition safe time tracker.
* @param node Node to which requests to build the index will be sent.
* @param enlistmentConsistencyToken Enlistment consistency token is used
to check that the lease is still actual while the
* message goes to the replica.
@@ -204,6 +217,8 @@ class IndexBuilder implements ManuallyCloseable {
int indexId,
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
+ TableTxRwOperationTracker partitionTxRwOperationTracker,
+ PendingComparableValuesTracker<HybridTimestamp, Void>
partitionSafeTime,
InternalClusterNode node,
long enlistmentConsistencyToken,
HybridTimestamp initialOperationTimestamp
@@ -217,10 +232,13 @@ class IndexBuilder implements ManuallyCloseable {
IndexBuildTask newTask = new IndexBuildTask(
taskId,
- indexCreationActivationTs(indexId),
+ indexCreationInfo(indexId),
+ indexBuildingStateActivationTimestamp(indexId),
indexStorage,
partitionStorage,
replicaService,
+ partitionTxRwOperationTracker,
+ partitionSafeTime,
failureProcessor,
finalTransactionStateResolver,
executor,
@@ -238,12 +256,19 @@ class IndexBuilder implements ManuallyCloseable {
});
}
- private HybridTimestamp indexCreationActivationTs(int indexId) {
+ private MetaIndexStatusChange indexCreationInfo(int indexId) {
+ return
requiredIndexMeta(indexId).statusChange(MetaIndexStatus.REGISTERED);
+ }
+
+ private IndexMeta requiredIndexMeta(int indexId) {
IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
assert indexMeta != null : "Index meta must be present for indexId=" +
indexId;
+ return indexMeta;
+ }
- long tsLong =
indexMeta.statusChange(MetaIndexStatus.REGISTERED).activationTimestamp();
- return HybridTimestamp.hybridTimestamp(tsLong);
+ private HybridTimestamp indexBuildingStateActivationTimestamp(int indexId)
{
+ MetaIndexStatusChange statusChange =
requiredIndexMeta(indexId).statusChange(MetaIndexStatus.BUILDING);
+ return hybridTimestamp(statusChange.activationTimestamp());
}
/**
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 237d88b8d14..58dd64b9a35 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ClusterService;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -95,6 +96,7 @@ public class IndexBuildingManager implements IgniteComponent {
FailureProcessor failureProcessor,
LowWatermark lowWatermark,
TxManager txManager,
+ PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
MetricManager metricManager
) {
this.metaStorageManager = metaStorageManager;
@@ -139,6 +141,7 @@ public class IndexBuildingManager implements
IgniteComponent {
clusterService,
placementDriver,
clockService,
+ partitionReplicaLifecycleManager,
failureProcessor
);
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 1cc0eb0b67a..45fdab88ae3 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -39,6 +39,8 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,6 +55,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
@@ -60,6 +63,7 @@ import
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -68,6 +72,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -89,6 +94,10 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
private final IndexMetaStorage indexMetaStorage =
mock(IndexMetaStorage.class);
+ private final TableTxRwOperationTracker txRwOperationTracker =
mock(TableTxRwOperationTracker.class);
+
+ private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime = mock(PendingComparableValuesTracker.class);
+
private final IndexBuilder indexBuilder = new IndexBuilder(
executorService,
mock(ReplicaService.class, invocation -> nullCompletedFuture()),
@@ -108,6 +117,10 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
@BeforeEach
void configureMocks() {
IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+
+
when(txRwOperationTracker.awaitCompleteTxRwOperations(anyInt())).thenReturn(nullCompletedFuture());
+
+ when(safeTime.waitFor(any())).thenReturn(nullCompletedFuture());
}
@BeforeEach
@@ -423,6 +436,8 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
indexId,
indexStorage,
mock(MvPartitionStorage.class),
+ txRwOperationTracker,
+ safeTime,
mock(InternalClusterNode.class),
ANY_ENLISTMENT_CONSISTENCY_TOKEN,
clock.current()
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 42624bdd6a3..74c702ad005 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -41,6 +41,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -61,6 +62,11 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableSegment;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
+import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -70,11 +76,16 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
/** For {@link IndexBuildController} testing. */
+@ExtendWith(MockitoExtension.class)
public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 10;
@@ -90,13 +101,11 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
private final ClockService clockService = new TestClockService(clock);
- private IndexManager indexManager = null;
-
@BeforeEach
- void setUp() {
+ void setUp(@Mock PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime) {
indexBuilder = mock(IndexBuilder.class);
- indexManager = mock(IndexManager.class, invocation -> {
+ IndexManager indexManager = mock(IndexManager.class, invocation -> {
MvTableStorage mvTableStorage = mock(MvTableStorage.class);
MvPartitionStorage mvPartitionStorage =
mock(MvPartitionStorage.class);
IndexStorage indexStorage = mock(IndexStorage.class);
@@ -112,6 +121,16 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
catalogManager = createCatalogManagerWithTestUpdateLog(NODE_NAME,
clock);
assertThat(catalogManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ PartitionReplicaLifecycleManager partitionReplicaLifecycleManager =
mock(PartitionReplicaLifecycleManager.class);
+ ZonePartitionResources zonePartitionResources =
mock(ZonePartitionResources.class);
+
lenient().doReturn(zonePartitionResources).when(partitionReplicaLifecycleManager).zonePartitionResourcesOrNull(any());
+
+ ZonePartitionReplicaListener replicaListener =
mock(ZonePartitionReplicaListener.class);
+
lenient().doReturn(completedFuture(replicaListener)).when(zonePartitionResources).replicaListenerFuture();
+
+ TableTxRwOperationTracker txRwOperationTracker =
mock(TableTxRwOperationTracker.class);
+ lenient().doReturn(new ReplicaTableSegment(txRwOperationTracker,
safeTime)).when(replicaListener).segmentFor(anyInt());
+
indexBuildController = new IndexBuildController(
indexBuilder,
indexManager,
@@ -119,6 +138,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
clusterService,
placementDriver,
clockService,
+ partitionReplicaLifecycleManager,
new NoOpFailureManager()
);
@@ -151,6 +171,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -163,6 +185,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -186,6 +210,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -198,6 +224,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -219,6 +247,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -231,6 +261,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(PK_INDEX_NAME)),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -252,6 +284,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(pkIndexName(tableName))),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -264,6 +298,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(pkIndexName(tableName))),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -308,6 +344,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
anyInt(),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
@@ -320,6 +358,8 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId0),
any(),
any(),
+ any(),
+ any(),
eq(LOCAL_NODE),
anyLong(),
any()
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index 91157b0ded0..8926fa2f9d2 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -29,7 +31,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -37,14 +43,15 @@ import static org.mockito.Mockito.when;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -53,11 +60,16 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
/** For {@link IndexBuilder} testing. */
public class IndexBuilderTest extends BaseIgniteAbstractTest {
@@ -77,6 +89,12 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest
{
private final IndexMetaStorage indexMetaStorage =
mock(IndexMetaStorage.class);
+ private final MvPartitionStorage mvPartitionStorage =
mock(MvPartitionStorage.class);
+
+ private final TableTxRwOperationTracker txRwOperationTracker =
mock(TableTxRwOperationTracker.class);
+
+ private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime = mock(PendingComparableValuesTracker.class);
+
private final TestMetricManager metricManager = new TestMetricManager();
private final IndexBuilder indexBuilder = new IndexBuilder(
@@ -91,14 +109,45 @@ public class IndexBuilderTest extends
BaseIgniteAbstractTest {
@BeforeEach
void configureMocks() {
IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+
+
when(txRwOperationTracker.awaitCompleteTxRwOperations(anyInt())).thenReturn(nullCompletedFuture());
+
+ when(safeTime.waitFor(any())).thenReturn(nullCompletedFuture());
}
@AfterEach
void tearDown() throws Exception {
closeAll(
indexBuilder::close,
- () -> shutdownAndAwaitTermination(executorService, 1,
TimeUnit.SECONDS)
+ () -> shutdownAndAwaitTermination(executorService, 1, SECONDS)
+ );
+ }
+
+ @Test
+ void testIndexBuildInvokesNecessaryWaitsBeforeStartingToBuild() {
+ int registerredStateCatalogVersion = 10;
+ long buildingStateActivationTs = 2000L;
+
+ IndexMeta indexMeta = new IndexMeta(
+ 100,
+ INDEX_ID,
+ TABLE_ID,
+ 1,
+ "idx",
+ MetaIndexStatus.BUILDING,
+ Map.of(
+ MetaIndexStatus.REGISTERED, new
MetaIndexStatusChange(registerredStateCatalogVersion, 1000L),
+ MetaIndexStatus.BUILDING, new
MetaIndexStatusChange(20, buildingStateActivationTs)
+ )
);
+ doReturn(indexMeta).when(indexMetaStorage).indexMeta(INDEX_ID);
+
+ scheduleBuildIndex(INDEX_ID, ZONE_ID, TABLE_ID, PARTITION_ID,
List.of(rowId(PARTITION_ID)));
+
+ InOrder inOrder = inOrder(txRwOperationTracker, safeTime,
mvPartitionStorage);
+ inOrder.verify(txRwOperationTracker,
timeout(SECONDS.toMillis(10))).awaitCompleteTxRwOperations(registerredStateCatalogVersion);
+ inOrder.verify(safeTime,
timeout(SECONDS.toMillis(10))).waitFor(hybridTimestamp(buildingStateActivationTs));
+ inOrder.verify(mvPartitionStorage,
timeout(SECONDS.toMillis(10))).highestRowId();
}
@Test
@@ -193,7 +242,9 @@ public class IndexBuilderTest extends
BaseIgniteAbstractTest {
partitionId,
indexId,
indexStorage(nextRowIdsToBuild),
- mock(MvPartitionStorage.class),
+ mvPartitionStorage,
+ txRwOperationTracker,
+ safeTime,
mock(InternalClusterNode.class),
ANY_ENLISTMENT_CONSISTENCY_TOKEN,
mock(HybridTimestamp.class)
@@ -213,7 +264,9 @@ public class IndexBuilderTest extends
BaseIgniteAbstractTest {
partitionId,
indexId,
indexStorage(nextRowIdsToBuild),
- mock(MvPartitionStorage.class),
+ mvPartitionStorage,
+ txRwOperationTracker,
+ safeTime,
mock(InternalClusterNode.class),
ANY_ENLISTMENT_CONSISTENCY_TOKEN,
mock(HybridTimestamp.class)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index c1c4a03a9d0..d18eae051cd 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -847,6 +847,7 @@ public class Node {
failureManager,
lowWatermark,
txManager,
+ partitionReplicaLifecycleManager,
metricManager
);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index eecdf47a4c9..a423ffea41e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1973,13 +1973,20 @@ public class PartitionReplicaLifecycleManager extends
* Returns resources for the given zone partition.
*/
public ZonePartitionResources zonePartitionResources(ZonePartitionId
zonePartitionId) {
- ZonePartitionResources resources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+ ZonePartitionResources resources =
zonePartitionResourcesOrNull(zonePartitionId);
assert resources != null : String.format("Missing resources for zone
partition [zonePartitionId=%s]", zonePartitionId);
return resources;
}
+ /**
+ * Returns resources for the given zone partition or {@code null} if not
available (because the replica is stopped/destroyed).
+ */
+ public @Nullable ZonePartitionResources
zonePartitionResourcesOrNull(ZonePartitionId zonePartitionId) {
+ return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+ }
+
/**
* For HA zones: Check that last rebalance was graceful (caused by common
rebalance triggers, like data nodes change, replica factor
* change, etc.) rather than forced (caused by a disaster recovery reset
after losing the majority of nodes).
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
index 3d9f8044010..a8e2e9cb698 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.partition.replicator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
/**
* Processor of replica requests targeted at a particular table.
@@ -38,4 +40,11 @@ public interface ReplicaTableProcessor {
/** Callback on replica shutdown. */
void onShutdown();
+
+ /** Returns tracker of RW transactions operations. */
+ TableTxRwOperationTracker txRwOperationTracker();
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-27405 as safe time
should not depend on the table.
+ /** Returns safe time tracker for the partition. */
+ PendingComparableValuesTracker<HybridTimestamp, Void> safeTime();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
new file mode 100644
index 00000000000..462210dd989
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Segment of a partition replica corresponding to a specific table.
+ */
+// TODO: https://issues.apache.org/jira/browse/IGNITE-27405 - remove this as
we'll not need to pass safe time tracker.
+public class ReplicaTableSegment {
+ private final TableTxRwOperationTracker txRwOperationTracker;
+ private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime;
+
+ public ReplicaTableSegment(
+ TableTxRwOperationTracker txRwOperationTracker,
+ PendingComparableValuesTracker<HybridTimestamp, Void> safeTime
+ ) {
+ this.txRwOperationTracker = txRwOperationTracker;
+ this.safeTime = safeTime;
+ }
+
+ public TableTxRwOperationTracker txRwOperationTracker() {
+ return txRwOperationTracker;
+ }
+
+ public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
+ return safeTime;
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
similarity index 57%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
index 3d9f8044010..eeb5bf99d3d 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
@@ -17,25 +17,16 @@
package org.apache.ignite.internal.partition.replicator;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.replicator.ReplicaResult;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
/**
- * Processor of replica requests targeted at a particular table.
+ * Tracks the completion of RW transactions' operations before a dependent
process begins.
*/
-public interface ReplicaTableProcessor {
+public interface TableTxRwOperationTracker {
/**
- * Processes replica request.
+ * Waits for RW transactions operations to complete strictly lower than
the requested catalog version.
*
- * @param request Replica request.
- * @param replicaPrimacy Replica primacy info.
- * @param senderId ID of the node that sent the request.
- * @return Future completed with the result of processing.
+ * @param catalogVersion Catalog version in question.
*/
- CompletableFuture<ReplicaResult> process(ReplicaRequest request,
ReplicaPrimacy replicaPrimacy, UUID senderId);
-
- /** Callback on replica shutdown. */
- void onShutdown();
+ CompletableFuture<Void> awaitCompleteTxRwOperations(int catalogVersion);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 0c2a08f1b87..cc0461378be 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
/**
@@ -317,6 +318,11 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
replicaProcessors.remove(tableId);
}
+ public @Nullable ReplicaTableSegment segmentFor(int tableId) {
+ ReplicaTableProcessor processor = replicaProcessors.get(tableId);
+ return processor == null ? null : new
ReplicaTableSegment(processor.txRwOperationTracker(), processor.safeTime());
+ }
+
/**
* Return table replicas listeners.
*
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 89a288788c1..959cbe912f9 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1200,6 +1200,7 @@ public class IgniteImpl implements Ignite {
failureManager,
lowWatermark,
txManager,
+ partitionReplicaLifecycleManager,
metricManager
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
index 732c2b33fb4..101410750f7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
@@ -62,7 +62,7 @@ public class IndexMeta {
* @param currentStatus Current status of the index
* @param statusChanges <b>Immutable</b> map of index statuses with change
info (for example catalog version) in which they appeared.
*/
- IndexMeta(
+ public IndexMeta(
int catalogVersion,
int indexId,
int tableId,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
index 4613c54275b..cd2fc606321 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
@@ -45,7 +46,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
* <li>When completing a RW transaction operation, {@link
#decrementOperationCount(int)} must be used.</li>
* </ul>
*/
-public class IndexBuilderTxRwOperationTracker implements ManuallyCloseable {
+public class IndexBuilderTxRwOperationTracker implements
TableTxRwOperationTracker, ManuallyCloseable {
private final AtomicInteger minAllowedCatalogVersionForStartOperation =
new AtomicInteger(-1);
private final NavigableMap<Integer, CompletableFuture<Void>>
minAllowedVersionRaiseFutures = new ConcurrentSkipListMap<>();
@@ -100,6 +101,7 @@ public class IndexBuilderTxRwOperationTracker implements
ManuallyCloseable {
*
* @param catalogVersion Catalog version in which the new index appeared.
*/
+ @Override
public CompletableFuture<Void> awaitCompleteTxRwOperations(int
catalogVersion) {
return inBusyLock(busyLock, () -> {
// This code is needed to avoid races with
updateAllowedCatalogVersionForStartOperation.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 704d3a00558..71e7eaeff54 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -102,6 +102,7 @@ import
org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
+import
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
import
org.apache.ignite.internal.partition.replicator.exception.OperationLockException;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
@@ -413,11 +414,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
- buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
- indexMetaStorage,
- indexBuildingProcessor.tracker(),
- safeTime,
- raftCommandApplicator);
+ buildIndexReplicaRequestHandler = new
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
}
@Override
@@ -3436,6 +3433,16 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
indexBuildingProcessor.onShutdown();
}
+ @Override
+ public TableTxRwOperationTracker txRwOperationTracker() {
+ return indexBuildingProcessor.tracker();
+ }
+
+ @Override
+ public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
+ return safeTime;
+ }
+
private int partId() {
return replicationGroupId.partitionId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 0926695a6c2..5476d275b8d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -17,13 +17,10 @@
package org.apache.ignite.internal.table.distributed.replicator.handlers;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
-import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -31,8 +28,6 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.Build
import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
-import
org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
/**
* Handler for {@link BuildIndexReplicaRequest}.
@@ -44,12 +39,6 @@ public class BuildIndexReplicaRequestHandler {
private final IndexMetaStorage indexMetaStorage;
- /** Read-write transaction operation tracker for building indexes. */
- private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
-
- /** Partition safe-time tracker. */
- private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime;
-
/** Applicator that applies RAFT command that is created by this handler.
*/
private final ReplicationRaftCommandApplicator commandApplicator;
@@ -57,19 +46,10 @@ public class BuildIndexReplicaRequestHandler {
* Creates a new instance of request handler.
*
* @param indexMetaStorage Index meta storage.
- * @param txRwOperationTracker Read-write transaction operation tracker
for building indexes.
- * @param safeTime Partition safe-time tracker.
* @param commandApplicator Applicator that applies RAFT command that is
created by this handler.
*/
- public BuildIndexReplicaRequestHandler(
- IndexMetaStorage indexMetaStorage,
- IndexBuilderTxRwOperationTracker txRwOperationTracker,
- PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
- ReplicationRaftCommandApplicator commandApplicator
- ) {
+ public BuildIndexReplicaRequestHandler(IndexMetaStorage indexMetaStorage,
ReplicationRaftCommandApplicator commandApplicator) {
this.indexMetaStorage = indexMetaStorage;
- this.txRwOperationTracker = txRwOperationTracker;
- this.safeTime = safeTime;
this.commandApplicator = commandApplicator;
}
@@ -86,12 +66,9 @@ public class BuildIndexReplicaRequestHandler {
return nullCompletedFuture();
}
- MetaIndexStatusChange registeredChangeInfo =
indexMeta.statusChange(REGISTERED);
MetaIndexStatusChange buildingChangeInfo =
indexMeta.statusChange(BUILDING);
- return
txRwOperationTracker.awaitCompleteTxRwOperations(registeredChangeInfo.catalogVersion())
- .thenCompose(unused ->
safeTime.waitFor(hybridTimestamp(buildingChangeInfo.activationTimestamp())))
- .thenCompose(unused ->
commandApplicator.applyCommand(toBuildIndexCommand(request,
buildingChangeInfo)));
+ return commandApplicator.applyCommand(toBuildIndexCommand(request,
buildingChangeInfo));
}
private static BuildIndexCommand
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange
buildingChangeInfo) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 95eabbe6712..dd436d029fe 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -57,7 +57,6 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@@ -137,7 +136,6 @@ import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
-import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.CatalogVersionAware;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
@@ -2586,11 +2584,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
@Test
- void testBuildIndexReplicaRequestWithoutRwTxOperations() {
+ void testBuildIndexReplicaRequest() {
int indexId = hashIndexStorage.id();
int indexCreationCatalogVersion = 1;
int startBuildingIndexCatalogVersion = 2;
- long indexCreationActivationTs =
clock.now().addPhysicalTime(-100).longValue();
+ long indexCreationActivationTs =
clock.now().subtractPhysicalTime(100).longValue();
long startBuildingIndexActivationTs = clock.nowLong();
setIndexMetaInBuildingStatus(
@@ -2601,85 +2599,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
startBuildingIndexActivationTs
);
- CompletableFuture<?> invokeBuildIndexReplicaRequestFuture =
invokeBuildIndexReplicaRequestAsync(indexId);
-
- assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
-
fireHashIndexStartBuildingEventForStaleTxOperation(indexId,
startBuildingIndexCatalogVersion);
- assertThat(invokeBuildIndexReplicaRequestFuture,
willCompleteSuccessfully());
- assertThat(invokeBuildIndexReplicaRequestAsync(indexId),
willCompleteSuccessfully());
- }
-
- @ParameterizedTest(name = "failCmd = {0}")
- @ValueSource(booleans = {false, true})
- void testBuildIndexReplicaRequest(boolean failCmd) {
- var continueNotBuildIndexCmdFuture = new CompletableFuture<Void>();
- var buildIndexCommandFuture = new
CompletableFuture<BuildIndexCommand>();
-
- when(mockRaftClient.run(any())).thenAnswer(invocation -> {
- Command cmd = invocation.getArgument(0);
-
- if (cmd instanceof BuildIndexCommand) {
- buildIndexCommandFuture.complete((BuildIndexCommand) cmd);
-
- return raftClientFutureClosure.apply(cmd);
- }
-
- return continueNotBuildIndexCmdFuture.thenCompose(unused ->
raftClientFutureClosure.apply(cmd));
- });
-
- UUID txId = newTxId();
- long beginTs = beginTimestamp(txId).longValue();
-
- when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
-
- BinaryRow row = binaryRow(0);
-
- CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row,
true);
-
- int indexId = hashIndexStorage.id();
- int indexCreationCatalogVersion = 1;
- int startBuildingIndexCatalogVersion = 2;
- long indexCreationActivationTs =
clock.now().addPhysicalTime(-100).longValue();
- long startBuildingIndexActivationTs = clock.nowLong();
-
- setIndexMetaInBuildingStatus(
- indexId,
- indexCreationCatalogVersion,
- indexCreationActivationTs,
- startBuildingIndexCatalogVersion,
- startBuildingIndexActivationTs
- );
-
CompletableFuture<?> invokeBuildIndexReplicaRequestFuture =
invokeBuildIndexReplicaRequestAsync(indexId);
- fireHashIndexStartBuildingEventForStaleTxOperation(indexId,
startBuildingIndexCatalogVersion);
-
- assertFalse(upsertFuture.isDone());
- assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
-
- if (failCmd) {
- continueNotBuildIndexCmdFuture.completeExceptionally(new
RuntimeException("error from test"));
-
- assertThat(upsertFuture, willThrow(RuntimeException.class));
- } else {
- continueNotBuildIndexCmdFuture.complete(null);
-
- assertThat(upsertFuture, willCompleteSuccessfully());
- }
-
assertThat(invokeBuildIndexReplicaRequestFuture,
willCompleteSuccessfully());
-
- HybridTimestamp startBuildingIndexActivationTs0 =
hybridTimestamp(startBuildingIndexActivationTs);
-
- verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs0));
-
- assertThat(buildIndexCommandFuture, willCompleteSuccessfully());
-
- BuildIndexCommand buildIndexCommand = buildIndexCommandFuture.join();
- assertThat(buildIndexCommand.indexId(), equalTo(indexId));
- assertThat(buildIndexCommand.requiredCatalogVersion(),
equalTo(startBuildingIndexCatalogVersion));
+ assertThat(invokeBuildIndexReplicaRequestAsync(indexId),
willCompleteSuccessfully());
}
private void fireHashIndexStartBuildingEventForStaleTxOperation(int
indexId, int startBuildingIndexCatalogVersion) {