This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 19add5bd95d IGNITE-24574 Implement required catalog version selection
for WriteIntentSwitch requests handling (#5323)
19add5bd95d is described below
commit 19add5bd95d26a8c5f80429d4216551626c373d0
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 3 11:23:51 2025 +0400
IGNITE-24574 Implement required catalog version selection for
WriteIntentSwitch requests handling (#5323)
---
.../replicator/ReliableCatalogVersions.java | 23 +++++++++++-
.../handlers/WriteIntentSwitchRequestHandler.java | 6 +---
.../replicator/PartitionReplicaListener.java | 11 ++----
.../replication/PartitionReplicaListenerTest.java | 42 ++++++++++++++++++++--
4 files changed, 64 insertions(+), 18 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
index 45c691eebd6..744fd36d4dd 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.partition.replicator;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogNotFoundException;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.SchemaSyncService;
@@ -36,7 +37,8 @@ public class ReliableCatalogVersions {
}
/**
- * Returns Catalog version corresponding to the given timestamp.
+ * Returns Catalog version corresponding to the given timestamp. Must only
be called if there is a guarantee that
+ * the corresponding version was not yet compacted.
*
* <p>This should only be used when the startup procedure is complete as
it relies on the catalog to be started.
*
@@ -46,4 +48,23 @@ public class ReliableCatalogVersions {
return schemaSyncService.waitForMetadataCompleteness(ts)
.thenApply(unused ->
catalogService.activeCatalogVersion(ts.longValue()));
}
+
+ /**
+ * Returns Catalog version corresponding to the given timestamp, or a
later one. A later version will be returned
+ * if the exactly matching version is already compacted out.
+ *
+ * <p>This should only be used when the startup procedure is complete as
it relies on the catalog to be started.
+ *
+ * @param ts Timestamp.
+ */
+ public CompletableFuture<Integer>
safeReliableCatalogVersionFor(HybridTimestamp ts) {
+ return schemaSyncService.waitForMetadataCompleteness(ts)
+ .thenApply(unused -> {
+ try {
+ return
catalogService.activeCatalogVersion(ts.longValue());
+ } catch (CatalogNotFoundException e) {
+ return catalogService.earliestCatalogVersion();
+ }
+ });
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
index 062b8eb1c93..e649d88ab49 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
@@ -117,12 +117,8 @@ public class WriteIntentSwitchRequestHandler {
.map(tableId ->
invokeTableWriteIntentSwitchReplicaRequest(tableId, request,
clockService.current(), senderId))
.collect(toList());
- // We choose current() to try to avoid compaction of the chosen
version (and we make sure it's not below commitTs [if it's a commit]
- // or txBeginTs [if it's an abort]). But there seems to be no
guarantee that the compactor will not remove this version.
- // TODO: IGNITE-24574 Introduce a mechanism to save the chosen catalog
version from being compacted too early.
@Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
HybridTimestamp commandTimestamp = commitTimestamp != null ?
commitTimestamp : beginTimestamp(request.txId());
- HybridTimestamp finalCommandTimestamp =
HybridTimestamp.max(commandTimestamp, clockService.current());
return allOf(futures)
.thenCompose(unused -> {
@@ -136,7 +132,7 @@ public class WriteIntentSwitchRequestHandler {
return completedFuture(new
ReplicaResult(writeIntentSwitchReplicationInfoFor(request), null));
}
- return
reliableCatalogVersions.reliableCatalogVersionFor(finalCommandTimestamp)
+ return
reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp)
.thenApply(catalogVersion -> {
CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture =
applyCommandToGroup(request,
catalogVersion)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 23bcec259b0..81752d2123e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1684,19 +1684,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
WriteIntentSwitchReplicatedInfo result =
writeIntentSwitchReplicatedInfoFor(request);
- if (enabledColocation()) {
- // We don't need to apply Raft command as zone replication
listener will do it for us.
- return completedFuture(result);
- }
+ assert !enabledColocation() : request;
- // We choose current() to try to avoid compaction of the chosen
version (and we make sure it's not below commitTs [if it's a commit]
- // or txBeginTs [if it's an abort]). But there seems to be no
guarantee that the compactor will not remove this version.
- // TODO: IGNITE-24574 Introduce a mechanism to save the chosen catalog
version from being compacted too early.
@Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
HybridTimestamp commandTimestamp = commitTimestamp != null ?
commitTimestamp : beginTimestamp(request.txId());
- commandTimestamp = HybridTimestamp.max(commandTimestamp,
clockService.current());
- return
reliableCatalogVersions.reliableCatalogVersionFor(commandTimestamp)
+ return
reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp)
.thenCompose(catalogVersion ->
applyWriteIntentSwitchCommandToGroup(request, catalogVersion))
.thenApply(res -> result);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5cfa5214784..f5350c11670 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -315,12 +315,15 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private final Function<Command, CompletableFuture<?>>
defaultMockRaftFutureClosure = cmd -> {
if (cmd instanceof WriteIntentSwitchCommand) {
- UUID txId = ((WriteIntentSwitchCommand) cmd).txId();
+ WriteIntentSwitchCommand switchCommand =
(WriteIntentSwitchCommand) cmd;
+ UUID txId = switchCommand.txId();
Set<RowId> rows = pendingRows.remove(txId);
- HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand)
cmd).commitTimestamp();
- assertNotNull(commitTimestamp);
+ HybridTimestamp commitTimestamp = switchCommand.commitTimestamp();
+ if (switchCommand.commit()) {
+ assertNotNull(commitTimestamp);
+ }
if (rows != null) {
for (RowId row : rows) {
@@ -1713,6 +1716,39 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
upsert(tx1, br1);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void writeIntentSwitchForCompactedCatalogTimestampWorks(boolean commit) {
+ int earliestVersion = 999;
+
+ UUID txId = newTxId();
+ HybridTimestamp beginTs = beginTimestamp(txId);
+ HybridTimestamp commitTs = clock.now();
+
+ HybridTimestamp reliableCatalogVersionTs = commit ? commitTs : beginTs;
+
when(catalogService.activeCatalogVersion(reliableCatalogVersionTs.longValue())).thenThrow(new
CatalogNotFoundException("Oops"));
+
when(catalogService.earliestCatalogVersion()).thenReturn(earliestVersion);
+
+ CompletableFuture<ReplicaResult> invokeFuture =
partitionReplicaListener.invoke(
+ TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
+ .groupId(tablePartitionIdMessage(grpId))
+ .tableIds(Set.of(grpId.tableId()))
+ .txId(txId)
+ .commit(commit)
+ .commitTimestamp(commit ? commitTs : null)
+ .build(),
+ localNode.id()
+ );
+
+ assertThat(invokeFuture, willCompleteSuccessfully());
+ assertThat(invokeFuture.join().applyResult().replicationFuture(),
willCompleteSuccessfully());
+
+ verify(mockRaftClient).run(commandCaptor.capture());
+ WriteIntentSwitchCommand command = (WriteIntentSwitchCommand)
commandCaptor.getValue();
+
+ assertThat(command.requiredCatalogVersion(), is(earliestVersion));
+ }
+
/**
* Puts several records into the storage, optionally leaving them as write
intents, alternately deleting and upserting the same row
* within the same RW transaction, then checking read correctness via read
only request.