This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fe4bdb0809c IGNITE-21546 Add write intent resolution to index backfill
process (#6810)
fe4bdb0809c is described below
commit fe4bdb0809cd29ce96456d2d4bddfd9e7d4badc8
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 22 12:16:19 2025 +0400
IGNITE-21546 Add write intent resolution to index backfill process (#6810)
---
.../RaftCommandsUnitCompatibilityArchTest.java | 13 ++-
.../internal/index/ItBuildIndexOneNodeTest.java | 2 -
.../ignite/internal/index/ItBuildIndexTest.java | 3 -
.../index/FinalTransactionStateResolver.java | 37 +++++++
.../ignite/internal/index/IndexBuildTask.java | 107 ++++++++++++++++++---
.../apache/ignite/internal/index/IndexBuilder.java | 30 +++++-
.../internal/index/IndexBuildingManager.java | 25 ++++-
.../RetryingFinalTransactionStateResolver.java | 74 ++++++++++++++
.../CommittedFinalTransactionStateResolver.java | 33 +++++++
.../index/IndexAvailabilityControllerTest.java | 12 ++-
.../ignite/internal/index/IndexBuilderTest.java | 13 ++-
.../internal/index/IndexMetaStorageMocks.java | 43 +++++++++
.../partition/replicator/fixtures/Node.java | 3 +-
.../network/PartitionReplicationMessageGroup.java | 4 +
.../BuildIndexCommandV3.java} | 29 ++----
.../replication/BuildIndexReplicaRequest.java | 4 +
.../PartitionCommandsCompatibilityTest.java | 25 +++++
.../raft/BaseCommandsCompatibilityTest.java | 4 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../internal/storage/MvPartitionStorage.java | 8 ++
.../apache/ignite/internal/storage/RowMeta.java | 70 ++++++++++++++
.../storage/ThreadAssertingMvPartitionStorage.java | 7 ++
.../AbstractMvPartitionStorageConcurrencyTest.java | 3 +
.../storage/AbstractMvPartitionStorageTest.java | 42 ++++++++
.../storage/AbstractMvTableStorageTest.java | 2 +
.../storage/impl/TestMvPartitionStorage.java | 18 +++-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 32 +++++-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 43 +++++++++
.../distributed/index/MetaIndexStatusChange.java | 2 +-
.../table/distributed/raft/PartitionListener.java | 15 ++-
.../raft/handlers/BuildIndexCommandHandler.java | 12 ++-
.../raft/handlers/BuildIndexRowVersionChooser.java | 17 +++-
.../replicator/TransactionStateResolver.java | 69 +++++++++++--
.../handlers/BuildIndexReplicaRequestHandler.java | 5 +-
.../handlers/BuildIndexRowVersionChooserTest.java | 32 +++++-
.../replication/PartitionReplicaListenerTest.java | 1 +
.../internal/tx/impl/PlacementDriverHelper.java | 33 ++++++-
37 files changed, 789 insertions(+), 86 deletions(-)
diff --git
a/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
b/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
index 0f0f521e12d..46a874f3055 100644
---
a/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
+++
b/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
@@ -21,8 +21,8 @@ import static
com.tngtech.archunit.core.importer.ImportOption.Predefined.DO_NOT_
import static
com.tngtech.archunit.core.importer.ImportOption.Predefined.DO_NOT_INCLUDE_TEST_FIXTURES;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import com.tngtech.archunit.core.domain.JavaClass;
@@ -59,11 +59,14 @@ public class RaftCommandsUnitCompatibilityArchTest {
Set<String> testedRaftCommands = collectTestedRaftCommands(classes);
+ Set<String> notTestedRaftCommands = raftCommands.stream()
+ .filter(cmd -> !testedRaftCommands.contains(cmd))
+ .collect(toSet());
+
assertThat(
- "There are still some raft commands that haven't been tested;
for example, see the successors of "
- + BaseCommandsCompatibilityTest.class.getName(),
- testedRaftCommands,
- containsInAnyOrder(raftCommands.toArray(new String[0]))
+ "There are still some raft commands that haven't been tested",
+ notTestedRaftCommands,
+ is(empty())
);
}
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 324eaa86622..6b38736040e 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -68,7 +68,6 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/** Integration test for testing the building of an index in a single node
cluster. */
@@ -369,7 +368,6 @@ public class ItBuildIndexOneNodeTest extends
BaseSqlIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
void writeIntentFromAbortedTxShouldNotBeIndexed() throws Exception {
createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index b7d7b0753f8..a29b96fa0ed 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -85,7 +85,6 @@ import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -224,7 +223,6 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() throws Exception
{
createTable(1, 1);
@@ -246,7 +244,6 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
void
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
throws Exception {
createTable(1, 1);
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
new file mode 100644
index 00000000000..cd98537b087
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxState;
+
+/**
+ * Resolves the final state of a transaction.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface FinalTransactionStateResolver {
+ /**
+ * Resolves the final state of a transaction. Cannot return a non-final
state.
+ *
+ * @param transactionId Transaction ID.
+ * @param commitGroupId Commit partition ID.
+ */
+ CompletableFuture<TxState> resolveFinalTxState(UUID transactionId,
ReplicationGroupId commitGroupId);
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 799a5fd892f..cbbaee2b13a 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableSet;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -27,7 +29,12 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +52,7 @@ import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicat
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.raft.GroupOverloadedException;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
@@ -53,11 +61,15 @@ import
org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.TrackerClosedException;
+import org.jetbrains.annotations.Nullable;
/** Task of building a table index. */
class IndexBuildTask {
@@ -70,6 +82,8 @@ class IndexBuildTask {
private final IndexBuildTaskId taskId;
+ private final HybridTimestamp indexCreationActivationTs;
+
private final IndexStorage indexStorage;
private final MvPartitionStorage partitionStorage;
@@ -80,6 +94,8 @@ class IndexBuildTask {
private final NodeProperties nodeProperties;
+ private final FinalTransactionStateResolver finalTransactionStateResolver;
+
private final Executor executor;
private final IgniteSpinBusyLock busyLock;
@@ -104,11 +120,13 @@ class IndexBuildTask {
IndexBuildTask(
IndexBuildTaskId taskId,
+ HybridTimestamp indexCreationActivationTs,
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
ReplicaService replicaService,
FailureProcessor failureProcessor,
NodeProperties nodeProperties,
+ FinalTransactionStateResolver finalTransactionStateResolver,
Executor executor,
IgniteSpinBusyLock busyLock,
int batchSize,
@@ -119,11 +137,13 @@ class IndexBuildTask {
HybridTimestamp initialOperationTimestamp
) {
this.taskId = taskId;
+ this.indexCreationActivationTs = indexCreationActivationTs;
this.indexStorage = indexStorage;
this.partitionStorage = partitionStorage;
this.replicaService = replicaService;
this.failureProcessor = failureProcessor;
this.nodeProperties = nodeProperties;
+ this.finalTransactionStateResolver = finalTransactionStateResolver;
this.executor = executor;
this.busyLock = busyLock;
this.batchSize = batchSize;
@@ -206,9 +226,10 @@ class IndexBuildTask {
}
try {
- List<RowId> batchRowIds = createBatchRowIds();
-
- return replicaService.invoke(node,
createBuildIndexReplicaRequest(batchRowIds, initialOperationTimestamp))
+ return createBatchToIndex()
+ .thenCompose(batch -> {
+ return replicaService.invoke(node,
createBuildIndexReplicaRequest(batch, initialOperationTimestamp));
+ })
.handleAsync((unused, throwable) -> {
if (throwable != null) {
Throwable cause = unwrapRootCause(throwable);
@@ -236,27 +257,68 @@ class IndexBuildTask {
}
}
- private List<RowId> createBatchRowIds() {
+ private CompletableFuture<BatchToIndex> createBatchToIndex() {
RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
- List<RowId> batch = new ArrayList<>(batchSize);
+ List<RowId> rowIds = new ArrayList<>(batchSize);
+ Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
- for (int i = 0; i < batchSize && nextRowIdToBuild != null; i++) {
- nextRowIdToBuild = partitionStorage.closestRowId(nextRowIdToBuild);
+ while (rowIds.size() < batchSize && nextRowIdToBuild != null) {
+ @Nullable RowMeta row =
partitionStorage.closestRow(nextRowIdToBuild);
- if (nextRowIdToBuild == null) {
+ if (row == null) {
break;
}
- batch.add(nextRowIdToBuild);
+ rowIds.add(row.rowId());
+
+ if (row.isWriteIntent()) {
+ UUID transactionId = row.transactionId();
+ assert transactionId != null;
+
+ // We only care about transactions which began after index
creation.
+ if
(TransactionIds.beginTimestamp(transactionId).compareTo(indexCreationActivationTs)
< 0) {
+ transactionsToResolve.put(
+ row.transactionId(),
+ new CommitPartitionId(row.commitTableOrZoneId(),
row.commitPartitionId())
+ );
+ }
+ }
- nextRowIdToBuild = nextRowIdToBuild.increment();
+ nextRowIdToBuild = row.rowId().increment();
}
- return batch;
+ Map<UUID, CompletableFuture<TxState>> txStateResolveFutures =
transactionsToResolve.entrySet().stream()
+ .map(entry -> Map.entry(entry.getKey(),
resolveFinalTxStateIfNeeded(entry.getKey(), entry.getValue())))
+ .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ return CompletableFutures.allOf(txStateResolveFutures.values())
+ .thenApply(unused -> {
+ Set<UUID> abortedTransactionIds =
txStateResolveFutures.entrySet().stream()
+ .filter(entry -> entry.getValue().join() ==
TxState.ABORTED)
+ .map(Entry::getKey)
+ .collect(toUnmodifiableSet());
+
+ return new BatchToIndex(rowIds, abortedTransactionIds);
+ });
+ }
+
+ private CompletableFuture<TxState> resolveFinalTxStateIfNeeded(UUID
transactionId, CommitPartitionId commitPartitionId) {
+ assert commitPartitionId.commitTableOrZoneId != null;
+
+ ReplicationGroupId commitGroupId =
targetGroupId(commitPartitionId.commitTableOrZoneId,
commitPartitionId.commitPartitionId);
+
+ return
finalTransactionStateResolver.resolveFinalTxState(transactionId, commitGroupId);
+ }
+
+ private ReplicationGroupId targetGroupId(int tableOrZoneId, int
partitionIndex) {
+ return nodeProperties.colocationEnabled()
+ ? new ZonePartitionId(tableOrZoneId, partitionIndex)
+ : new TablePartitionId(tableOrZoneId, partitionIndex);
}
- private BuildIndexReplicaRequest
createBuildIndexReplicaRequest(List<RowId> rowIds, HybridTimestamp
initialOperationTimestamp) {
+ private BuildIndexReplicaRequest
createBuildIndexReplicaRequest(BatchToIndex batch, HybridTimestamp
initialOperationTimestamp) {
+ List<RowId> rowIds = batch.rowIds;
boolean finish = rowIds.size() < batchSize;
ReplicationGroupIdMessage groupIdMessage =
nodeProperties.colocationEnabled()
@@ -269,6 +331,7 @@ class IndexBuildTask {
.indexId(taskId.getIndexId())
.rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
.finish(finish)
+ .abortedTransactionIds(batch.abortedTransactionIds)
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.timestamp(initialOperationTimestamp)
.build();
@@ -298,4 +361,24 @@ class IndexBuildTask {
}
}
}
+
+ private static class BatchToIndex {
+ private final List<RowId> rowIds;
+ private final Set<UUID> abortedTransactionIds;
+
+ private BatchToIndex(List<RowId> rowIds, Set<UUID>
abortedTransactionIds) {
+ this.rowIds = rowIds;
+ this.abortedTransactionIds = abortedTransactionIds;
+ }
+ }
+
+ private static class CommitPartitionId {
+ private final @Nullable Integer commitTableOrZoneId;
+ private final int commitPartitionId;
+
+ private CommitPartitionId(@Nullable Integer commitTableOrZoneId, int
commitPartitionId) {
+ this.commitTableOrZoneId = commitTableOrZoneId;
+ this.commitPartitionId = commitPartitionId;
+ }
+ }
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index 5241ea5dd18..23fe48ce1cb 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -40,6 +40,9 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
@@ -71,6 +74,10 @@ class IndexBuilder implements ManuallyCloseable {
private final NodeProperties nodeProperties;
+ private final FinalTransactionStateResolver finalTransactionStateResolver;
+
+ private final IndexMetaStorage indexMetaStorage;
+
private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById =
new ConcurrentHashMap<>();
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -80,11 +87,20 @@ class IndexBuilder implements ManuallyCloseable {
private final List<IndexBuildCompletionListener> listeners = new
CopyOnWriteArrayList<>();
/** Constructor. */
- IndexBuilder(Executor executor, ReplicaService replicaService,
FailureProcessor failureProcessor, NodeProperties nodeProperties) {
+ IndexBuilder(
+ Executor executor,
+ ReplicaService replicaService,
+ FailureProcessor failureProcessor,
+ NodeProperties nodeProperties,
+ FinalTransactionStateResolver finalTransactionStateResolver,
+ IndexMetaStorage indexMetaStorage
+ ) {
this.executor = executor;
this.replicaService = replicaService;
this.failureProcessor = failureProcessor;
this.nodeProperties = nodeProperties;
+ this.finalTransactionStateResolver = finalTransactionStateResolver;
+ this.indexMetaStorage = indexMetaStorage;
}
/**
@@ -133,11 +149,13 @@ class IndexBuilder implements ManuallyCloseable {
IndexBuildTask newTask = new IndexBuildTask(
taskId,
+ indexCreationActivationTs(indexId),
indexStorage,
partitionStorage,
replicaService,
failureProcessor,
nodeProperties,
+ finalTransactionStateResolver,
executor,
busyLock,
BATCH_SIZE,
@@ -196,11 +214,13 @@ class IndexBuilder implements ManuallyCloseable {
IndexBuildTask newTask = new IndexBuildTask(
taskId,
+ indexCreationActivationTs(indexId),
indexStorage,
partitionStorage,
replicaService,
failureProcessor,
nodeProperties,
+ finalTransactionStateResolver,
executor,
busyLock,
BATCH_SIZE,
@@ -215,6 +235,14 @@ class IndexBuilder implements ManuallyCloseable {
});
}
+ private HybridTimestamp indexCreationActivationTs(int indexId) {
+ IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+ assert indexMeta != null : "Index meta must be present for indexId=" +
indexId;
+
+ long tsLong =
indexMeta.statusChange(MetaIndexStatus.REGISTERED).activationTimestamp();
+ return HybridTimestamp.hybridTimestamp(tsLong);
+ }
+
/**
* Stops building all indexes (for a table partition) if they are in
progress.
*
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index a474d5b9bfd..8aef37cf8af 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -46,9 +46,13 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
@@ -90,7 +94,8 @@ public class IndexBuildingManager implements IgniteComponent {
ClockService clockService,
FailureProcessor failureProcessor,
NodeProperties nodeProperties,
- LowWatermark lowWatermark
+ LowWatermark lowWatermark,
+ TxManager txManager
) {
this.metaStorageManager = metaStorageManager;
@@ -107,7 +112,23 @@ public class IndexBuildingManager implements
IgniteComponent {
executor.allowCoreThreadTimeOut(true);
- indexBuilder = new IndexBuilder(executor, replicaService,
failureProcessor, nodeProperties);
+ TransactionStateResolver transactionStateResolver = new
TransactionStateResolver(
+ txManager,
+ clockService,
+ clusterService.topologyService(),
+ clusterService.messagingService(),
+ new ExecutorInclinedPlacementDriver(placementDriver, executor),
+ new TxMessageSender(clusterService.messagingService(),
replicaService, clockService)
+ );
+
+ indexBuilder = new IndexBuilder(
+ executor,
+ replicaService,
+ failureProcessor,
+ nodeProperties,
+ new
RetryingFinalTransactionStateResolver(transactionStateResolver, executor),
+ indexMetaStorage
+ );
indexAvailabilityController = new
IndexAvailabilityController(catalogManager, metaStorageManager,
failureProcessor, indexBuilder);
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
new file mode 100644
index 00000000000..61ff3483327
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * FinalTransactionStateResolver implementation using retries. Makes sense
when the coordinator of the transaction is known
+ * to have left the topology.
+ */
+public class RetryingFinalTransactionStateResolver implements
FinalTransactionStateResolver {
+ private final TransactionStateResolver stateResolver;
+
+ private final Executor delayedExecutor;
+
+ /** Constructor. */
+ public RetryingFinalTransactionStateResolver(TransactionStateResolver
stateResolver, Executor executor) {
+ this.stateResolver = stateResolver;
+ delayedExecutor = CompletableFuture.delayedExecutor(1000,
TimeUnit.MILLISECONDS, executor);
+ }
+
+ @Override
+ public CompletableFuture<TxState> resolveFinalTxState(UUID transactionId,
ReplicationGroupId commitGroupId) {
+ return stateResolver.resolveTxState(transactionId, commitGroupId,
null, Long.MAX_VALUE, SECONDS)
+ .thenCompose(txMeta -> {
+ if (txMeta != null &&
TxState.isFinalState(txMeta.txState())) {
+ return completedFuture(txMeta.txState());
+ }
+
+ return retryResolve(transactionId, commitGroupId);
+ })
+ .handle((res, ex) -> {
+ if (ex instanceof PrimaryReplicaAwaitTimeoutException) {
+ return retryResolve(transactionId, commitGroupId);
+ }
+
+ return CompletableFutures.completedOrFailedFuture(res, ex);
+ })
+ .thenCompose(identity());
+ }
+
+ private CompletableFuture<TxState> retryResolve(UUID transactionId,
ReplicationGroupId commitGroupId) {
+ return supplyAsync(() -> resolveFinalTxState(transactionId,
commitGroupId), delayedExecutor)
+ .thenCompose(identity());
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
new file mode 100644
index 00000000000..be884cacb2a
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxState;
+
+/**
+ * FinalTransactionStateResolver implementation that always returns COMMITTED
state immediately.
+ */
+public class CommittedFinalTransactionStateResolver implements
FinalTransactionStateResolver {
+ @Override
+ public CompletableFuture<TxState> resolveFinalTxState(UUID transactionId,
ReplicationGroupId commitGroupId) {
+ return CompletableFuture.completedFuture(TxState.COMMITTED);
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index dcac1821417..4e62755f83a 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -86,11 +87,15 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
private final ExecutorService executorService = newSingleThreadExecutor();
+ private final IndexMetaStorage indexMetaStorage =
mock(IndexMetaStorage.class);
+
private final IndexBuilder indexBuilder = new IndexBuilder(
executorService,
mock(ReplicaService.class, invocation -> nullCompletedFuture()),
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new CommittedFinalTransactionStateResolver(),
+ indexMetaStorage
);
private final IndexAvailabilityController indexAvailabilityController =
new IndexAvailabilityController(
@@ -100,6 +105,11 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
indexBuilder
);
+ @BeforeEach
+ void configureMocks() {
+ IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+ }
+
@BeforeEach
void setUp() {
ComponentContext context = new ComponentContext();
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index 4f517daaeba..9fb01cf1e65 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -53,8 +53,10 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** For {@link IndexBuilder} testing. */
@@ -73,13 +75,22 @@ public class IndexBuilderTest extends
BaseIgniteAbstractTest {
private final ExecutorService executorService = newSingleThreadExecutor();
+ private final IndexMetaStorage indexMetaStorage =
mock(IndexMetaStorage.class);
+
private final IndexBuilder indexBuilder = new IndexBuilder(
executorService,
replicaService,
new NoOpFailureManager(),
- new SystemPropertiesNodeProperties()
+ new SystemPropertiesNodeProperties(),
+ new CommittedFinalTransactionStateResolver(),
+ indexMetaStorage
);
+ @BeforeEach
+ void configureMocks() {
+ IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+ }
+
@AfterEach
void tearDown() throws Exception {
closeAll(
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
new file mode 100644
index 00000000000..1dd8f16cf46
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+
+class IndexMetaStorageMocks {
+ static void configureMocksForBuildingPhase(IndexMetaStorage
indexMetaStorage) {
+ doAnswer(invocation -> {
+ int indexId = invocation.getArgument(0);
+
+ IndexMeta indexMeta = mock(IndexMeta.class);
+ when(indexMeta.indexId()).thenReturn(indexId);
+ when(indexMeta.statusChange(any())).thenReturn(new
MetaIndexStatusChange(1, HybridTimestamp.MIN_VALUE.longValue()));
+
+ return indexMeta;
+ }).when(indexMetaStorage).indexMeta(anyInt());
+ }
+}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 5233c985ce6..e8c616a44dd 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -858,7 +858,8 @@ public class Node {
clockService,
failureManager,
nodeProperties,
- lowWatermark
+ lowWatermark,
+ txManager
);
systemViewManager = new SystemViewManagerImpl(name, catalogManager,
failureManager);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 09c2f55e802..7564fb07d5e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.partition.replicator.network.PartitionR
import org.apache.ignite.internal.network.annotations.MessageGroup;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV1;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
@@ -276,6 +277,9 @@ public interface PartitionReplicationMessageGroup {
/** Message type for {@link FinishTxCommandV2}. */
short FINISH_TX_V2 = 50;
+
+ /** Message type for {@link BuildIndexCommandV3}. */
+ short BUILD_INDEX_V3 = 51;
}
/**
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
similarity index 54%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
index 71dee513a9e..7bc88921f5a 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
@@ -15,29 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.network.replication;
+package org.apache.ignite.internal.partition.replicator.network.command;
-import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TableAware;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
-/**
- * Replica request to build a table index.
- *
- * <p>It is possible to receive a {@link PrimaryReplicaMissException} in
response to message processing if the leaseholder changes.</p>
- */
-@Transferable(PartitionReplicationMessageGroup.BUILD_INDEX_REPLICA_REQUEST)
-public interface BuildIndexReplicaRequest extends PrimaryReplicaRequest,
TableAware {
- /** Returns index ID. */
- int indexId();
-
- /** Returns row IDs for which to build indexes. */
- List<UUID> rowIds();
-
- /** Returns {@code true} if this batch is the last one. */
- boolean finish();
+/** Extension of {@link BuildIndexCommandV2} with new fields to support
backward compatibility. */
+@Transferable(Commands.BUILD_INDEX_V3)
+public interface BuildIndexCommandV3 extends BuildIndexCommandV2 {
+ /** IDs of transactions (to which write intents in this batch belong) that
are known to have been aborted. */
+ Set<UUID> abortedTransactionIds();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
index 71dee513a9e..acd88b7c086 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.partition.replicator.network.replication;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
@@ -40,4 +41,7 @@ public interface BuildIndexReplicaRequest extends
PrimaryReplicaRequest, TableAw
/** Returns {@code true} if this batch is the last one. */
boolean finish();
+
+ /** IDs of transactions (to which write intents in this batch belong) that
are known to have been aborted. */
+ Set<UUID> abortedTransactionIds();
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
index b34b2609704..909e5adac99 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.partition.replicator.network.command;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -66,6 +68,7 @@ public class PartitionCommandsCompatibilityTest extends
BaseCommandsCompatibilit
return List.of(
createBuildIndexCommand(),
createBuildIndexCommandV2(),
+ createBuildIndexCommandV3(),
createFinishTxCommandV1(),
createFinishTxCommandV2(),
createUpdateAllCommand(),
@@ -99,6 +102,18 @@ public class PartitionCommandsCompatibilityTest extends
BaseCommandsCompatibilit
assertEquals(7, command.tableId());
}
+ @Test
+ @TestForCommand(BuildIndexCommandV3.class)
+ void testBuildIndexCommandV3() {
+ BuildIndexCommandV3 command =
decodeCommand("CjQDAP/////////W/////////7sAAAAAAAAAACoAAAAAAAAARQFGAgAAAAAAAAAAKgAAAAAAAABFCA==");
+
+ assertEquals(69, command.indexId());
+ assertEquals(List.of(uuid()), command.rowIds());
+ assertTrue(command.finish());
+ assertEquals(7, command.tableId());
+ assertThat(command.abortedTransactionIds(), containsInAnyOrder(uuid(),
anotherUuid()));
+ }
+
@Test
@TestForCommand(FinishTxCommandV1.class)
void testFinishTxCommandV1() {
@@ -390,6 +405,16 @@ public class PartitionCommandsCompatibilityTest extends
BaseCommandsCompatibilit
.build();
}
+ private BuildIndexCommandV2 createBuildIndexCommandV3() {
+ return commandFactory.buildIndexCommandV3()
+ .indexId(69)
+ .rowIds(List.of(uuid()))
+ .finish(true)
+ .tableId(7)
+ .abortedTransactionIds(Set.of(uuid(), anotherUuid()))
+ .build();
+ }
+
private BuildIndexCommand createBuildIndexCommand() {
return commandFactory.buildIndexCommand()
.indexId(69)
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
index 4075ffefa68..fe61f6d617b 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
@@ -86,6 +86,10 @@ public abstract class BaseCommandsCompatibilityTest extends
BaseIgniteAbstractTe
return new UUID(42, 69);
}
+ protected static UUID anotherUuid() {
+ return new UUID(-42, -69);
+ }
+
protected static HybridTimestamp initiatorTime() {
return HybridTimestamp.hybridTimestamp(70);
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b4086f773a5..f318532845d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1198,7 +1198,8 @@ public class IgniteImpl implements Ignite {
clockService,
failureManager,
nodeProperties,
- lowWatermark
+ lowWatermark,
+ txManager
);
qryEngine = new SqlQueryProcessor(
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 58aa1241949..7342f178cec 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -269,6 +269,14 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
@Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
+ /**
+ * Returns a row, existing in the storage, that's greater or equal than
the lower bound. {@code null} if not found.
+ *
+ * @param lowerBound Lower bound.
+ * @throws StorageException If failed to read data from the storage.
+ */
+ @Nullable RowMeta closestRow(RowId lowerBound) throws StorageException;
+
/**
* Returns the head of GC queue.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
new file mode 100644
index 00000000000..51030c8c93f
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents metadata of a row stored in the storage.
+ */
+public class RowMeta {
+ private final RowId rowId;
+ private final @Nullable UUID transactionId;
+
+ private final @Nullable Integer commitTableOrZoneId;
+ private final int commitPartitionId;
+
+ /** Creates a RowMeta instance for a row without a write intent. */
+ public static RowMeta withoutWriteIntent(RowId rowId) {
+ return new RowMeta(rowId, null, null,
ReadResult.UNDEFINED_COMMIT_PARTITION_ID);
+ }
+
+ /** Constructor. */
+ public RowMeta(RowId rowId, @Nullable UUID transactionId, @Nullable
Integer commitTableOrZoneId, int commitPartitionId) {
+ this.rowId = rowId;
+ this.transactionId = transactionId;
+ this.commitTableOrZoneId = commitTableOrZoneId;
+ this.commitPartitionId = commitPartitionId;
+ }
+
+ /** Returns the row ID. */
+ public RowId rowId() {
+ return rowId;
+ }
+
+ /** Returns the transaction ID if this row is a write intent, or {@code
null} otherwise. */
+ public @Nullable UUID transactionId() {
+ return transactionId;
+ }
+
+ /** Returns {@code true} if this row is a write intent. */
+ public boolean isWriteIntent() {
+ return transactionId != null;
+ }
+
+ /** Returns the commit table or zone ID if this row has a write intent, or
{@code null} otherwise. */
+ public @Nullable Integer commitTableOrZoneId() {
+ return commitTableOrZoneId;
+ }
+
+ /** Returns the commit partition ID. If row has no write intent, it's
{@link ReadResult#UNDEFINED_COMMIT_PARTITION_ID}. */
+ public int commitPartitionId() {
+ return commitPartitionId;
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 0e558f2d973..d9d0746e628 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -150,6 +150,13 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
return partitionStorage.closestRowId(lowerBound);
}
+ @Override
+ public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.closestRow(lowerBound);
+ }
+
@Override
public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
assertThreadAllowsToRead();
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index e9f7b361ef9..6a1d2a49abf 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,6 +154,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
abortWrite(ROW_ID, txId);
assertNull(storage.closestRowId(ROW_ID));
+ assertNull(storage.closestRow(ROW_ID));
}
}
@@ -196,6 +197,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
);
assertNull(storage.closestRowId(ROW_ID));
+ assertNull(storage.closestRow(ROW_ID));
}
}
@@ -222,6 +224,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
assertNull(storage.closestRowId(ROW_ID));
+ assertNull(storage.closestRow(ROW_ID));
assertThat(rows, empty());
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 826f9f7af57..90d0d358db9 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1519,6 +1519,48 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
assertNull(storage.closestRowId(rowId2.increment()));
}
+ @Test
+ void testClosestRow() {
+ RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
+ RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
+ RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
+
+ RowMeta expectedRowMeta1 = new RowMeta(rowId1, txId, COMMIT_TABLE_ID,
PARTITION_ID);
+ RowMeta expectedRowMeta2 = new RowMeta(rowId2, txId, COMMIT_TABLE_ID,
PARTITION_ID);
+
+ addWrite(rowId1, binaryRow, txId);
+ addWrite(rowId2, binaryRow2, txId);
+
+ assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId0));
+ assertRowMetaEquals(expectedRowMeta1,
storage.closestRow(rowId0.increment()));
+
+ assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId1));
+
+ assertRowMetaEquals(expectedRowMeta2, storage.closestRow(rowId2));
+
+ assertNull(storage.closestRow(rowId2.increment()));
+ }
+
+ private static void assertRowMetaEquals(RowMeta expected, RowMeta actual) {
+ assertNotNull(actual);
+
+ assertEquals(expected.rowId(), actual.rowId());
+ assertEquals(expected.transactionId(), actual.transactionId());
+ assertEquals(expected.commitTableOrZoneId(),
actual.commitTableOrZoneId());
+ assertEquals(expected.commitPartitionId(), actual.commitPartitionId());
+ }
+
+ @Test
+ void testClosestRowReconstruction() {
+ RowId rowId = new RowId(PARTITION_ID, 0x1234567890ABCDEFL,
0xFEDCBA0987654321L);
+
+ RowMeta expectedRowMeta = new RowMeta(rowId, txId, COMMIT_TABLE_ID,
PARTITION_ID);
+
+ addWrite(rowId, binaryRow, txId);
+
+ assertRowMetaEquals(expectedRowMeta,
storage.closestRow(RowId.lowestRowId(PARTITION_ID)));
+ }
+
@Test
public void addWriteCommittedAddsCommittedVersion() {
RowId rowId = new RowId(PARTITION_ID);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index f56f92ae8e8..5708e7f2b7f 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -607,6 +607,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageDestroyedException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () ->
storage.closestRowId(rowId));
+ assertThrows(StorageDestroyedException.class, () ->
storage.closestRow(rowId));
}
@Test
@@ -1558,6 +1559,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvTableStorageTest
assertThrows(StorageRebalanceException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageRebalanceException.class, () ->
storage.scan(clock.now()));
assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
+ assertThrows(StorageRebalanceException.class, () ->
storage.closestRow(rowId));
return null;
});
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index e27244157c6..622ed7fe43c 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.UUID;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageDestroyedException;
import org.apache.ignite.internal.storage.StorageException;
@@ -138,7 +140,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return new VersionChain(rowId, row, null, txId, commitTableId,
commitPartitionId, next);
}
- static VersionChain forCommitted(RowId rowId, @Nullable
HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+ static VersionChain forCommitted(RowId rowId, HybridTimestamp
timestamp, VersionChain uncommittedVersionChain) {
return new VersionChain(rowId, uncommittedVersionChain.row,
timestamp, null, null,
ReadResult.UNDEFINED_COMMIT_PARTITION_ID,
uncommittedVersionChain.next);
}
@@ -628,6 +630,20 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return map.ceilingKey(lowerBound);
}
+ @Override
+ public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ Entry<RowId, VersionChain> entry = map.ceilingEntry(lowerBound);
+ if (entry == null) {
+ return null;
+ }
+
+ VersionChain versionChain = entry.getValue();
+
+ return new RowMeta(versionChain.rowId, versionChain.txId,
versionChain.commitTableId, versionChain.commitPartitionId);
+ }
+
@Override
public synchronized @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
assert THREAD_LOCAL_LOCKER.get() != null;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index f7acc6dd576..fe9582d44d6 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -258,7 +259,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
ReadResult findLatestRowVersion(VersionChain versionChain) {
- RowVersion rowVersion = readRowVersion(versionChain.headLink(),
ALWAYS_LOAD_VALUE);
+ return findLatestRowVersion(versionChain, ALWAYS_LOAD_VALUE);
+ }
+
+ private ReadResult findLatestRowVersion(VersionChain versionChain,
Predicate<HybridTimestamp> loadValue) {
+ RowVersion rowVersion = readRowVersion(versionChain.headLink(),
loadValue);
if (versionChain.isUncommitted()) {
assert versionChain.transactionId() != null;
@@ -267,7 +272,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
if (versionChain.hasCommittedVersions()) {
long newestCommitLink = versionChain.newestCommittedLink();
- newestCommitTs = readRowVersion(newestCommitLink,
ALWAYS_LOAD_VALUE).timestamp();
+ newestCommitTs = readRowVersion(newestCommitLink,
loadValue).timestamp();
}
return writeIntentToResult(versionChain, rowVersion,
newestCommitTs);
@@ -613,6 +618,29 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
});
}
+ @Override
+ public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ try (Cursor<VersionChain> cursor =
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
+ if (cursor.hasNext()) {
+ VersionChain versionChain = cursor.next();
+ return new RowMeta(
+ versionChain.rowId(),
+ versionChain.transactionId(),
+ versionChain.commitTableId(),
+ versionChain.commitPartitionId()
+ );
+ }
+
+ return null;
+ } catch (Exception e) {
+ throw new StorageException("Error occurred while trying to
read a row id", e);
+ }
+ });
+ }
+
@Override
public void close() {
if (!transitionToClosedState(state, this::createStorageInfo)) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 8d6134813b5..3abd1e1953d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
@@ -1113,6 +1114,48 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
});
}
+ @Override
+ public @Nullable RowMeta closestRow(RowId lowerBound) throws
StorageException {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
+ .position(0)
+ .limit(ROW_PREFIX_SIZE);
+
+ try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
+ it.seek(keyBuf);
+
+ if (!it.isValid()) {
+ it.status();
+
+ return null;
+ }
+
+ keyBuf.rewind();
+ int keyLength = it.key(keyBuf);
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+ RowId rowId = getRowId(keyBuf);
+
+ if (isWriteIntent) {
+ ByteBuffer transactionState = ByteBuffer.wrap(it.value());
+
+ readDataIdFromTxState(transactionState);
+ UUID txId = new UUID(transactionState.getLong(),
transactionState.getLong());
+ int commitTableId = transactionState.getInt();
+ int commitPartitionId =
Short.toUnsignedInt(transactionState.getShort());
+
+ return new RowMeta(rowId, txId, commitTableId,
commitPartitionId);
+ } else {
+ return RowMeta.withoutWriteIntent(rowId);
+ }
+ } catch (RocksDBException e) {
+ throw new IgniteRocksDbException("Error finding closest Row
ID", e);
+ }
+ });
+ }
+
private static void incrementRowId(ByteBuffer buf) {
long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
index 72040807de1..448627869b9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
@@ -27,7 +27,7 @@ public class MetaIndexStatusChange {
private final long activationTs;
/** Constructor. */
- MetaIndexStatusChange(int catalogVersion, long activationTs) {
+ public MetaIndexStatusChange(int catalogVersion, long activationTs) {
this.catalogVersion = catalogVersion;
this.activationTs = activationTs;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index da83a602324..3491323c4dc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -21,6 +21,7 @@ import static java.lang.Math.max;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V1;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V2;
+import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V3;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V1;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V2;
import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND;
@@ -193,18 +194,16 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
tablePartitionId,
minTimeCollectorService
));
- commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V1, new
BuildIndexCommandHandler(
- storage,
- indexMetaStorage,
- storageUpdateHandler,
- schemaRegistry
- ));
- commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V2, new
BuildIndexCommandHandler(
+
+ BuildIndexCommandHandler buildIndexCommandHandler = new
BuildIndexCommandHandler(
storage,
indexMetaStorage,
storageUpdateHandler,
schemaRegistry
- ));
+ );
+ commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V1,
buildIndexCommandHandler);
+ commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V2,
buildIndexCommandHandler);
+ commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V3,
buildIndexCommandHandler);
if (!nodeProperties.colocationEnabled()) {
commandHandlersBuilder.addHandler(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
index 96b442b5432..5598113e6db 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
import
org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -110,7 +112,10 @@ public class BuildIndexCommandHandler extends
AbstractCommandHandler<BuildIndexC
return EMPTY_APPLIED_RESULT;
}
- BuildIndexRowVersionChooser rowVersionChooser =
createBuildIndexRowVersionChooser(indexMeta);
+ Set<UUID> abortedTransactionIds = command instanceof
BuildIndexCommandV3
+ ? ((BuildIndexCommandV3) command).abortedTransactionIds()
+ : Set.of();
+ BuildIndexRowVersionChooser rowVersionChooser =
createBuildIndexRowVersionChooser(indexMeta, abortedTransactionIds);
BinaryRowUpgrader binaryRowUpgrader =
createBinaryRowUpgrader(indexMeta);
@@ -146,14 +151,15 @@ public class BuildIndexCommandHandler extends
AbstractCommandHandler<BuildIndexC
return EMPTY_APPLIED_RESULT;
}
- private BuildIndexRowVersionChooser
createBuildIndexRowVersionChooser(IndexMeta indexMeta) {
+ private BuildIndexRowVersionChooser
createBuildIndexRowVersionChooser(IndexMeta indexMeta, Set<UUID>
abortedTransactionIds) {
MetaIndexStatusChange registeredChangeInfo =
indexMeta.statusChange(REGISTERED);
MetaIndexStatusChange buildingChangeInfo =
indexMeta.statusChange(BUILDING);
return new BuildIndexRowVersionChooser(
storage,
registeredChangeInfo.activationTimestamp(),
- buildingChangeInfo.activationTimestamp()
+ buildingChangeInfo.activationTimestamp(),
+ abortedTransactionIds
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
index 33b51a51827..a2514d5e5a0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
@@ -38,6 +40,8 @@ class BuildIndexRowVersionChooser {
/** Timestamp of activation of the catalog version in which the index
start building (get {@link CatalogIndexStatus#BUILDING}). */
private final long startBuildingIndexActivationTs;
+ private final Set<UUID> abortedTransactionIds;
+
/**
* Constructor.
*
@@ -45,11 +49,18 @@ class BuildIndexRowVersionChooser {
* @param createIndexActivationTs Timestamp of activation of the catalog
version in which the index created.
* @param startBuildingIndexActivationTs Timestamp of activation of the
catalog version in which the index start building
* (get {@link CatalogIndexStatus#BUILDING}).
+ * @param abortedTransactionIds IDs of transactions that are known to have
been aborted.
*/
- BuildIndexRowVersionChooser(PartitionDataStorage storage, long
createIndexActivationTs, long startBuildingIndexActivationTs) {
+ BuildIndexRowVersionChooser(
+ PartitionDataStorage storage,
+ long createIndexActivationTs,
+ long startBuildingIndexActivationTs,
+ Set<UUID> abortedTransactionIds
+ ) {
this.storage = storage;
this.createIndexActivationTs = createIndexActivationTs;
this.startBuildingIndexActivationTs = startBuildingIndexActivationTs;
+ this.abortedTransactionIds = Set.copyOf(abortedTransactionIds);
}
/**
@@ -78,7 +89,8 @@ class BuildIndexRowVersionChooser {
}
if (readResult.isWriteIntent()) {
- if (beginTs(readResult) >= createIndexActivationTs) {
+ if (beginTs(readResult) >= createIndexActivationTs
+ ||
abortedTransactionIds.contains(readResult.transactionId())) {
continue;
}
} else {
@@ -89,7 +101,6 @@ class BuildIndexRowVersionChooser {
}
}
- // TODO: IGNITE-21546 It may be necessary to return ReadResult
for write intent resolution
result.add(new BinaryRowAndRowId(readResult.binaryRow(),
rowId));
if (takenLatestVersionOfWriteCommitted) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index da3cc3dab03..00189c63500 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -18,16 +18,19 @@
package org.apache.ignite.internal.table.distributed.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static
org.apache.ignite.internal.tx.impl.PlacementDriverHelper.AWAIT_PRIMARY_REPLICA_TIMEOUT;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -140,6 +143,26 @@ public class TransactionStateResolver {
UUID txId,
ReplicationGroupId commitGrpId,
@Nullable HybridTimestamp timestamp
+ ) {
+ return resolveTxState(txId, commitGrpId, timestamp,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+ }
+
+ /**
+ * Resolves transaction state locally, if possible, or distributively, if
needed.
+ *
+ * @param txId Transaction id.
+ * @param commitGrpId Commit partition group id.
+ * @param timestamp Timestamp.
+ * @param awaitCommitPartitionAvailabilityTimeout Timeout for awaiting
commit partition primary replica.
+ * @param awaitCommitPartitionAvailabilityTimeUnit Time unit for awaiting
commit partition primary replica timeout.
+ * @return Future with the transaction state meta as a result.
+ */
+ public CompletableFuture<TransactionMeta> resolveTxState(
+ UUID txId,
+ ReplicationGroupId commitGrpId,
+ @Nullable HybridTimestamp timestamp,
+ long awaitCommitPartitionAvailabilityTimeout,
+ TimeUnit awaitCommitPartitionAvailabilityTimeUnit
) {
TxStateMeta localMeta = txManager.stateMeta(txId);
@@ -151,7 +174,15 @@ public class TransactionStateResolver {
if (v == null) {
v = new CompletableFuture<>();
- resolveDistributiveTxState(txId, localMeta, commitGrpId,
timestamp, v);
+ resolveDistributiveTxState(
+ txId,
+ localMeta,
+ commitGrpId,
+ timestamp,
+ awaitCommitPartitionAvailabilityTimeout,
+ awaitCommitPartitionAvailabilityTimeUnit,
+ v
+ );
}
return v;
@@ -169,6 +200,8 @@ public class TransactionStateResolver {
* @param localMeta Local tx meta.
* @param commitGrpId Commit partition group id.
* @param timestamp Timestamp to pass to target node.
+ * @param awaitCommitPartitionAvailabilityTimeout Timeout for awaiting
commit partition primary replica.
+ * @param awaitCommitPartitionAvailabilityTimeUnit Time unit for awaiting
commit partition primary replica timeout.
* @param txMetaFuture Tx meta future to complete with the result.
*/
private void resolveDistributiveTxState(
@@ -176,6 +209,8 @@ public class TransactionStateResolver {
@Nullable TxStateMeta localMeta,
ReplicationGroupId commitGrpId,
@Nullable HybridTimestamp timestamp,
+ long awaitCommitPartitionAvailabilityTimeout,
+ TimeUnit awaitCommitPartitionAvailabilityTimeUnit,
CompletableFuture<TransactionMeta> txMetaFuture
) {
assert localMeta == null || !isFinalState(localMeta.txState()) :
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
@@ -184,7 +219,13 @@ public class TransactionStateResolver {
if (localMeta == null) {
// Fallback to commit partition path, because we don't have
coordinator id.
- resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+ resolveTxStateFromCommitPartition(
+ txId,
+ commitGrpId,
+ awaitCommitPartitionAvailabilityTimeout,
+ awaitCommitPartitionAvailabilityTimeUnit,
+ txMetaFuture
+ );
} else if (localMeta.txState() == PENDING) {
resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(),
commitGrpId, timestamp0, txMetaFuture);
} else if (localMeta.txState() == FINISHING) {
@@ -241,10 +282,20 @@ public class TransactionStateResolver {
UUID txId,
ReplicationGroupId commitGrpId,
CompletableFuture<TransactionMeta> txMetaFuture
+ ) {
+ resolveTxStateFromCommitPartition(txId, commitGrpId,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS, txMetaFuture);
+ }
+
+ private void resolveTxStateFromCommitPartition(
+ UUID txId,
+ ReplicationGroupId commitGrpId,
+ long awaitPrimaryReplicaTimeout,
+ TimeUnit awaitPrimaryReplicaTimeUnit,
+ CompletableFuture<TransactionMeta> txMetaFuture
) {
updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
- sendAndRetry(txMetaFuture, commitGrpId, txId);
+ sendAndRetry(txMetaFuture, commitGrpId, txId,
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit);
}
/**
@@ -271,8 +322,14 @@ public class TransactionStateResolver {
* @param replicaGrp Replication group id.
* @param txId Transaction id.
*/
- private void sendAndRetry(CompletableFuture<TransactionMeta> resFut,
ReplicationGroupId replicaGrp, UUID txId) {
-
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(replicaGrp)
+ private void sendAndRetry(
+ CompletableFuture<TransactionMeta> resFut,
+ ReplicationGroupId replicaGrp,
+ UUID txId,
+ long awaitPrimaryReplicaTimeout,
+ TimeUnit awaitPrimaryReplicaTimeUnit
+ ) {
+
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(replicaGrp,
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit)
.thenCompose(replicaMeta ->
txMessageSender.resolveTxStateFromCommitPartition(
replicaMeta.getLeaseholder(),
@@ -287,7 +344,7 @@ public class TransactionStateResolver {
resFut.complete(txMeta);
} else {
if (e instanceof PrimaryReplicaMissException) {
- sendAndRetry(resFut, replicaGrp, txId);
+ sendAndRetry(resFut, replicaGrp, txId,
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit);
} else {
resFut.completeExceptionally(e);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 0511007b50f..0926695a6c2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -95,12 +95,13 @@ public class BuildIndexReplicaRequestHandler {
}
private static BuildIndexCommand
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange
buildingChangeInfo) {
- return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV2()
+ return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
.indexId(request.indexId())
.tableId(request.tableId())
.rowIds(request.rowIds())
.finish(request.finish())
- // We are sure that there will be no error here since the
primary replica is sent the request to itself.
+ .abortedTransactionIds(request.abortedTransactionIds())
+ // We are sure that there will be no error here since the
primary replica is sending the request to itself.
.requiredCatalogVersion(buildingChangeInfo.catalogVersion())
.build();
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
index 48cf66c1a8b..6537e46632a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft.handlers;
+import static java.util.Collections.emptySet;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.storage.BinaryRowAndRowIdMatcher.equalToBinaryRowAndRowId;
import static org.apache.ignite.internal.storage.RowId.lowestRowId;
@@ -25,8 +26,10 @@ import static
org.apache.ignite.internal.type.NativeTypes.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.spy;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -63,11 +66,16 @@ public class BuildIndexRowVersionChooserTest extends
IgniteAbstractTest {
private final PartitionDataStorage partitionDataStorage = spy(new
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
- private final BuildIndexRowVersionChooser chooser = new
BuildIndexRowVersionChooser(
- partitionDataStorage,
- CREATE_INDEX_ACTIVATION_TS_MILLS,
- START_BUILDING_INDEX_ACTIVATION_TS_MILLS
- );
+ private final BuildIndexRowVersionChooser chooser =
createChooser(emptySet());
+
+ private BuildIndexRowVersionChooser createChooser(Set<UUID>
abortedTransactionIds) {
+ return new BuildIndexRowVersionChooser(
+ partitionDataStorage,
+ CREATE_INDEX_ACTIVATION_TS_MILLS,
+ START_BUILDING_INDEX_ACTIVATION_TS_MILLS,
+ abortedTransactionIds
+ );
+ }
@Test
void testEmptyStorage() {
@@ -107,6 +115,20 @@ public class BuildIndexRowVersionChooserTest extends
IgniteAbstractTest {
assertThat(chooser.chooseForBuildIndex(rowId),
contains(expBinaryRowAndRowId(rowId, row)));
}
+ @Test
+ void testWriteIntentBelongingToAbortedTransaction() {
+ UUID txId = txId(CREATE_INDEX_ACTIVATION_TS_MILLS - 1);
+
+ var customizedChooser = createChooser(Set.of(txId));
+
+ RowId rowId = lowestRowId(PARTITION_ID);
+ BinaryRow row = binaryRow(1, 2);
+
+ addWrite(rowId, row, txId);
+
+ assertThat(customizedChooser.chooseForBuildIndex(rowId), is(empty()));
+ }
+
@Test
void testWriteCommittedAfterStartBuildingActivationTs() {
RowId rowId = lowestRowId(PARTITION_ID);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index fe0968f689d..461039c473f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -3374,6 +3374,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.indexId(indexId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.rowIds(List.of())
+ .abortedTransactionIds(Set.of())
.timestamp(clock.current())
.build();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index faa80e005f8..d4a83cff41e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -49,7 +50,8 @@ public class PlacementDriverHelper {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PlacementDriverHelper.class);
- private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+ /** Default timeout in seconds to await primary replica. */
+ public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
/** Placement driver. */
private final PlacementDriver placementDriver;
@@ -75,16 +77,35 @@ public class PlacementDriverHelper {
* appeared during the await timeout.
*/
public CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaWithExceptionHandling(ReplicationGroupId partitionId) {
+ return awaitPrimaryReplicaWithExceptionHandling(partitionId,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+ }
+
+ /**
+ * Wait for primary replica to appear for the provided partition.
+ *
+ * @param partitionId Partition id.
+ * @param timeout Timeout duration.
+ * @param timeUnit Timeout time unit.
+ * @return Future that completes with node id that is a primary for the
provided partition, or completes with exception if no primary
+ * appeared during the await timeout.
+ */
+ public CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaWithExceptionHandling(
+ ReplicationGroupId partitionId,
+ long timeout,
+ TimeUnit timeUnit
+ ) {
HybridTimestamp timestamp = clockService.now();
- return awaitPrimaryReplicaWithExceptionHandling(partitionId,
timestamp);
+ return awaitPrimaryReplicaWithExceptionHandling(partitionId,
timestamp, timeout, timeUnit);
}
private CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaWithExceptionHandling(
ReplicationGroupId partitionId,
- HybridTimestamp timestamp
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit timeUnit
) {
- return placementDriver.awaitPrimaryReplica(partitionId, timestamp,
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+ return placementDriver.awaitPrimaryReplica(partitionId, timestamp,
timeout, timeUnit)
.handle((primaryReplica, e) -> {
if (e != null) {
LOG.debug("Failed to retrieve primary replica for
partition {}", partitionId, e);
@@ -121,7 +142,9 @@ public class PlacementDriverHelper {
* @return A future that completes with a map of node to the partitions
the node is primary for.
*/
public CompletableFuture<Map<String, Set<ReplicationGroupId>>>
awaitPrimaryReplicas(Collection<ReplicationGroupId> partitions) {
- return computePrimaryReplicas(partitions,
this::awaitPrimaryReplicaWithExceptionHandling)
+ BiFunction<ReplicationGroupId, HybridTimestamp,
CompletableFuture<ReplicaMeta>> action = (groupId, timestamp)
+ -> awaitPrimaryReplicaWithExceptionHandling(groupId,
timestamp, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+ return computePrimaryReplicas(partitions, action)
.thenApply(partitionData -> partitionData.partitionsByNode);
}