This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 571c312ae73 IGNITE-25338 Linearize applied index update in 
OnSnapshotSaveHandler (#5786)
571c312ae73 is described below

commit 571c312ae7370f7c8d7be652be5b31118feec726
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu May 8 15:08:55 2025 +0300

    IGNITE-25338 Linearize applied index update in OnSnapshotSaveHandler (#5786)
---
 .../replicator/raft/OnSnapshotSaveHandler.java       | 18 ++++++++----------
 .../raft/ZonePartitionRaftListenerTest.java          |  2 +-
 .../ThreadAssertingTxStatePartitionStorage.java      |  4 ++--
 .../tx/storage/state/TxStatePartitionStorage.java    |  2 +-
 .../rocksdb/TxStateRocksDbPartitionStorage.java      | 20 ++++++++++++++++----
 .../state/AbstractTxStatePartitionStorageTest.java   |  8 +++-----
 .../state/test/TestTxStatePartitionStorage.java      |  5 +----
 7 files changed, 32 insertions(+), 27 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
index a7f1458a3cf..4cb9caf74fa 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
@@ -58,22 +58,20 @@ public class OnSnapshotSaveHandler {
         long lastAppliedIndex = snapshotInfo.lastAppliedIndex();
         long lastAppliedTerm = snapshotInfo.lastAppliedTerm();
 
-        CompletableFuture<?>[] tableStorageFlushFutures = 
tableProcessors.stream()
-                .map(processor -> {
-                    processor.lastApplied(lastAppliedIndex, lastAppliedTerm);
+        tableProcessors.forEach(processor -> 
processor.lastApplied(lastAppliedIndex, lastAppliedTerm));
+
+        txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
 
-                    return processor.flushStorage();
-                })
+        CompletableFuture<?>[] tableStorageFlushFutures = 
tableProcessors.stream()
+                .map(RaftTableProcessor::flushStorage)
                 .toArray(CompletableFuture<?>[]::new);
 
         // Flush the TX state storage last to guarantee that all data is 
flushed before the snapshot is saved.
         return allOf(tableStorageFlushFutures)
                 .thenComposeAsync(v -> {
-                    txStatePartitionStorage.snapshotInfo(
-                            VersionedSerialization.toBytes(snapshotInfo, 
PartitionSnapshotInfoSerializer.INSTANCE),
-                            lastAppliedIndex,
-                            lastAppliedTerm
-                    );
+                    byte[] snapshotInfoBytes = 
VersionedSerialization.toBytes(snapshotInfo, 
PartitionSnapshotInfoSerializer.INSTANCE);
+
+                    txStatePartitionStorage.snapshotInfo(snapshotInfoBytes);
 
                     return txStatePartitionStorage.flush();
                 }, partitionOperationsExecutor);
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 1a1a68997e0..273a33c487e 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -189,7 +189,7 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
         byte[] snapshotInfoBytes = 
VersionedSerialization.toBytes(snapshotInfo, 
PartitionSnapshotInfoSerializer.INSTANCE);
 
-        verify(txStatePartitionStorage).snapshotInfo(snapshotInfoBytes, 
snapshotInfo.lastAppliedIndex(), snapshotInfo.lastAppliedTerm());
+        verify(txStatePartitionStorage).snapshotInfo(snapshotInfoBytes);
     }
 
     @Test
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
index 81fe2aa5dd7..3467c5c5cf5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
@@ -184,9 +184,9 @@ public class ThreadAssertingTxStatePartitionStorage 
implements TxStatePartitionS
     }
 
     @Override
-    public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
+    public void snapshotInfo(byte[] snapshotInfo) {
         assertThreadAllowsToWrite();
 
-        storage.snapshotInfo(snapshotInfo, index, term);
+        storage.snapshotInfo(snapshotInfo);
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
index e6a1a9be279..6f62a8e517e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
@@ -248,7 +248,7 @@ public interface TxStatePartitionStorage extends 
ManuallyCloseable {
     /**
      * Updates the current snapshot information.
      */
-    void snapshotInfo(byte[] snapshotInfo, long index, long term);
+    void snapshotInfo(byte[] snapshotInfo);
 
     /**
      * Returns the current snapshot information of {@code null} if it was 
never saved.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
index 7f89c09571f..85a13658cfb 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
@@ -580,12 +580,24 @@ public class TxStateRocksDbPartitionStorage implements 
TxStatePartitionStorage {
     }
 
     @Override
-    public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
-        updateData(writeBatch -> {
-            metaStorage.updateSnapshotInfo(writeBatch, snapshotInfo);
+    public void snapshotInfo(byte[] snapshotInfo) {
+        busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance();
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                metaStorage.updateSnapshotInfo(writeBatch, snapshotInfo);
+
+                sharedStorage.db().write(sharedStorage.writeOptions, 
writeBatch);
+            } catch (RocksDBException e) {
+                throw new TxStateStorageException(
+                        TX_STATE_STORAGE_ERR,
+                        format("Failed to update data in the storage: [{}]", 
createStorageInfo()),
+                        e
+                );
+            }
 
             return null;
-        }, index, term);
+        });
     }
 
     @Override
diff --git 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
index e642668fbde..6fb009432e2 100644
--- 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
+++ 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
@@ -523,11 +523,9 @@ public abstract class AbstractTxStatePartitionStorageTest 
extends BaseIgniteAbst
         assertThat(storage.lastAppliedIndex(), is(3L));
         assertThat(storage.lastAppliedTerm(), is(50L));
 
-        storage.snapshotInfo(SNAPSHOT_INFO, 4, 60);
+        storage.snapshotInfo(SNAPSHOT_INFO);
 
         assertThat(storage.snapshotInfo(), is(SNAPSHOT_INFO));
-        assertThat(storage.lastAppliedIndex(), is(4L));
-        assertThat(storage.lastAppliedTerm(), is(60L));
     }
 
     @Test
@@ -595,7 +593,7 @@ public abstract class AbstractTxStatePartitionStorageTest 
extends BaseIgniteAbst
         );
         assertThrowsStorageRebalanceException(
                 TX_STATE_STORAGE_REBALANCE_ERR,
-                () -> storage.snapshotInfo(SNAPSHOT_INFO, 1, 2)
+                () -> storage.snapshotInfo(SNAPSHOT_INFO)
         );
         assertThrowsStorageRebalanceException(TX_STATE_STORAGE_REBALANCE_ERR, 
() -> storage.get(UUID.randomUUID()));
         assertThrowsStorageRebalanceException(TX_STATE_STORAGE_REBALANCE_ERR, 
() -> storage.remove(UUID.randomUUID(), 100, 500));
@@ -679,7 +677,7 @@ public abstract class AbstractTxStatePartitionStorageTest 
extends BaseIgniteAbst
 
         storage.committedGroupConfiguration(GROUP_CONFIGURATION, 
lastAppliedIndex + 1, lastAppliedTerm);
         storage.leaseInfo(LEASE_INFO, lastAppliedIndex + 2, lastAppliedTerm);
-        storage.snapshotInfo(SNAPSHOT_INFO, lastAppliedIndex + 3, 
lastAppliedTerm);
+        storage.snapshotInfo(SNAPSHOT_INFO);
     }
 
     protected static void fillStorageDuringRebalance(TxStatePartitionStorage 
storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
diff --git 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
index 38b66e08831..5f3f2162289 100644
--- 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
+++ 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
@@ -317,13 +317,10 @@ public class TestTxStatePartitionStorage implements 
TxStatePartitionStorage {
     }
 
     @Override
-    public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
+    public void snapshotInfo(byte[] snapshotInfo) {
         checkStorageClosedOrInProgressOfRebalance();
 
         this.snapshotInfo = snapshotInfo;
-
-        lastAppliedIndex = index;
-        lastAppliedTerm = term;
     }
 
     @Override

Reply via email to