This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 19add5bd95d IGNITE-24574 Implement required catalog version selection 
for WriteIntentSwitch requests handling (#5323)
19add5bd95d is described below

commit 19add5bd95d26a8c5f80429d4216551626c373d0
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 3 11:23:51 2025 +0400

    IGNITE-24574 Implement required catalog version selection for 
WriteIntentSwitch requests handling (#5323)
---
 .../replicator/ReliableCatalogVersions.java        | 23 +++++++++++-
 .../handlers/WriteIntentSwitchRequestHandler.java  |  6 +---
 .../replicator/PartitionReplicaListener.java       | 11 ++----
 .../replication/PartitionReplicaListenerTest.java  | 42 ++++++++++++++++++++--
 4 files changed, 64 insertions(+), 18 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
index 45c691eebd6..744fd36d4dd 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReliableCatalogVersions.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.partition.replicator;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogNotFoundException;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.SchemaSyncService;
@@ -36,7 +37,8 @@ public class ReliableCatalogVersions {
     }
 
     /**
-     * Returns Catalog version corresponding to the given timestamp.
+     * Returns Catalog version corresponding to the given timestamp. Must only 
be called if there is a guarantee that
+     * the corresponding version was not yet compacted.
      *
      * <p>This should only be used when the startup procedure is complete as 
it relies on the catalog to be started.
      *
@@ -46,4 +48,23 @@ public class ReliableCatalogVersions {
         return schemaSyncService.waitForMetadataCompleteness(ts)
                 .thenApply(unused -> 
catalogService.activeCatalogVersion(ts.longValue()));
     }
+
+    /**
+     * Returns Catalog version corresponding to the given timestamp, or a 
later one. A later version will be returned
+     * if the exactly matching version is already compacted out.
+     *
+     * <p>This should only be used when the startup procedure is complete as 
it relies on the catalog to be started.
+     *
+     * @param ts Timestamp.
+     */
+    public CompletableFuture<Integer> 
safeReliableCatalogVersionFor(HybridTimestamp ts) {
+        return schemaSyncService.waitForMetadataCompleteness(ts)
+                .thenApply(unused -> {
+                    try {
+                        return 
catalogService.activeCatalogVersion(ts.longValue());
+                    } catch (CatalogNotFoundException e) {
+                        return catalogService.earliestCatalogVersion();
+                    }
+                });
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
index 062b8eb1c93..e649d88ab49 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/WriteIntentSwitchRequestHandler.java
@@ -117,12 +117,8 @@ public class WriteIntentSwitchRequestHandler {
                 .map(tableId -> 
invokeTableWriteIntentSwitchReplicaRequest(tableId, request, 
clockService.current(), senderId))
                 .collect(toList());
 
-        // We choose current() to try to avoid compaction of the chosen 
version (and we make sure it's not below commitTs [if it's a commit]
-        // or txBeginTs [if it's an abort]). But there seems to be no 
guarantee that the compactor will not remove this version.
-        // TODO: IGNITE-24574 Introduce a mechanism to save the chosen catalog 
version from being compacted too early.
         @Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
         HybridTimestamp commandTimestamp = commitTimestamp != null ? 
commitTimestamp : beginTimestamp(request.txId());
-        HybridTimestamp finalCommandTimestamp = 
HybridTimestamp.max(commandTimestamp, clockService.current());
 
         return allOf(futures)
                 .thenCompose(unused -> {
@@ -136,7 +132,7 @@ public class WriteIntentSwitchRequestHandler {
                         return completedFuture(new 
ReplicaResult(writeIntentSwitchReplicationInfoFor(request), null));
                     }
 
-                    return 
reliableCatalogVersions.reliableCatalogVersionFor(finalCommandTimestamp)
+                    return 
reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp)
                             .thenApply(catalogVersion -> {
                                 
CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture =
                                         applyCommandToGroup(request, 
catalogVersion)
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 23bcec259b0..81752d2123e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1684,19 +1684,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         WriteIntentSwitchReplicatedInfo result = 
writeIntentSwitchReplicatedInfoFor(request);
 
-        if (enabledColocation()) {
-            // We don't need to apply Raft command as zone replication 
listener will do it for us.
-            return completedFuture(result);
-        }
+        assert !enabledColocation() : request;
 
-        // We choose current() to try to avoid compaction of the chosen 
version (and we make sure it's not below commitTs [if it's a commit]
-        // or txBeginTs [if it's an abort]). But there seems to be no 
guarantee that the compactor will not remove this version.
-        // TODO: IGNITE-24574 Introduce a mechanism to save the chosen catalog 
version from being compacted too early.
         @Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
         HybridTimestamp commandTimestamp = commitTimestamp != null ? 
commitTimestamp : beginTimestamp(request.txId());
-        commandTimestamp = HybridTimestamp.max(commandTimestamp, 
clockService.current());
 
-        return 
reliableCatalogVersions.reliableCatalogVersionFor(commandTimestamp)
+        return 
reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp)
                 .thenCompose(catalogVersion -> 
applyWriteIntentSwitchCommandToGroup(request, catalogVersion))
                 .thenApply(res -> result);
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5cfa5214784..f5350c11670 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -315,12 +315,15 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private final Function<Command, CompletableFuture<?>> 
defaultMockRaftFutureClosure = cmd -> {
         if (cmd instanceof WriteIntentSwitchCommand) {
-            UUID txId = ((WriteIntentSwitchCommand) cmd).txId();
+            WriteIntentSwitchCommand switchCommand = 
(WriteIntentSwitchCommand) cmd;
+            UUID txId = switchCommand.txId();
 
             Set<RowId> rows = pendingRows.remove(txId);
 
-            HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) 
cmd).commitTimestamp();
-            assertNotNull(commitTimestamp);
+            HybridTimestamp commitTimestamp = switchCommand.commitTimestamp();
+            if (switchCommand.commit()) {
+                assertNotNull(commitTimestamp);
+            }
 
             if (rows != null) {
                 for (RowId row : rows) {
@@ -1713,6 +1716,39 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         upsert(tx1, br1);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void writeIntentSwitchForCompactedCatalogTimestampWorks(boolean commit) {
+        int earliestVersion = 999;
+
+        UUID txId = newTxId();
+        HybridTimestamp beginTs = beginTimestamp(txId);
+        HybridTimestamp commitTs = clock.now();
+
+        HybridTimestamp reliableCatalogVersionTs = commit ? commitTs : beginTs;
+        
when(catalogService.activeCatalogVersion(reliableCatalogVersionTs.longValue())).thenThrow(new
 CatalogNotFoundException("Oops"));
+        
when(catalogService.earliestCatalogVersion()).thenReturn(earliestVersion);
+
+        CompletableFuture<ReplicaResult> invokeFuture = 
partitionReplicaListener.invoke(
+                    TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
+                            .groupId(tablePartitionIdMessage(grpId))
+                            .tableIds(Set.of(grpId.tableId()))
+                            .txId(txId)
+                            .commit(commit)
+                            .commitTimestamp(commit ? commitTs : null)
+                            .build(),
+                    localNode.id()
+            );
+
+        assertThat(invokeFuture, willCompleteSuccessfully());
+        assertThat(invokeFuture.join().applyResult().replicationFuture(), 
willCompleteSuccessfully());
+
+        verify(mockRaftClient).run(commandCaptor.capture());
+        WriteIntentSwitchCommand command = (WriteIntentSwitchCommand) 
commandCaptor.getValue();
+
+        assertThat(command.requiredCatalogVersion(), is(earliestVersion));
+    }
+
     /**
      * Puts several records into the storage, optionally leaving them as write 
intents, alternately deleting and upserting the same row
      * within the same RW transaction, then checking read correctness via read 
only request.

Reply via email to