This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 571c312ae73 IGNITE-25338 Linearize applied index update in
OnSnapshotSaveHandler (#5786)
571c312ae73 is described below
commit 571c312ae7370f7c8d7be652be5b31118feec726
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu May 8 15:08:55 2025 +0300
IGNITE-25338 Linearize applied index update in OnSnapshotSaveHandler (#5786)
---
.../replicator/raft/OnSnapshotSaveHandler.java | 18 ++++++++----------
.../raft/ZonePartitionRaftListenerTest.java | 2 +-
.../ThreadAssertingTxStatePartitionStorage.java | 4 ++--
.../tx/storage/state/TxStatePartitionStorage.java | 2 +-
.../rocksdb/TxStateRocksDbPartitionStorage.java | 20 ++++++++++++++++----
.../state/AbstractTxStatePartitionStorageTest.java | 8 +++-----
.../state/test/TestTxStatePartitionStorage.java | 5 +----
7 files changed, 32 insertions(+), 27 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
index a7f1458a3cf..4cb9caf74fa 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
@@ -58,22 +58,20 @@ public class OnSnapshotSaveHandler {
long lastAppliedIndex = snapshotInfo.lastAppliedIndex();
long lastAppliedTerm = snapshotInfo.lastAppliedTerm();
- CompletableFuture<?>[] tableStorageFlushFutures =
tableProcessors.stream()
- .map(processor -> {
- processor.lastApplied(lastAppliedIndex, lastAppliedTerm);
+ tableProcessors.forEach(processor ->
processor.lastApplied(lastAppliedIndex, lastAppliedTerm));
+
+ txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
- return processor.flushStorage();
- })
+ CompletableFuture<?>[] tableStorageFlushFutures =
tableProcessors.stream()
+ .map(RaftTableProcessor::flushStorage)
.toArray(CompletableFuture<?>[]::new);
// Flush the TX state storage last to guarantee that all data is
flushed before the snapshot is saved.
return allOf(tableStorageFlushFutures)
.thenComposeAsync(v -> {
- txStatePartitionStorage.snapshotInfo(
- VersionedSerialization.toBytes(snapshotInfo,
PartitionSnapshotInfoSerializer.INSTANCE),
- lastAppliedIndex,
- lastAppliedTerm
- );
+ byte[] snapshotInfoBytes =
VersionedSerialization.toBytes(snapshotInfo,
PartitionSnapshotInfoSerializer.INSTANCE);
+
+ txStatePartitionStorage.snapshotInfo(snapshotInfoBytes);
return txStatePartitionStorage.flush();
}, partitionOperationsExecutor);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 1a1a68997e0..273a33c487e 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -189,7 +189,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
byte[] snapshotInfoBytes =
VersionedSerialization.toBytes(snapshotInfo,
PartitionSnapshotInfoSerializer.INSTANCE);
- verify(txStatePartitionStorage).snapshotInfo(snapshotInfoBytes,
snapshotInfo.lastAppliedIndex(), snapshotInfo.lastAppliedTerm());
+ verify(txStatePartitionStorage).snapshotInfo(snapshotInfoBytes);
}
@Test
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
index 81fe2aa5dd7..3467c5c5cf5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStatePartitionStorage.java
@@ -184,9 +184,9 @@ public class ThreadAssertingTxStatePartitionStorage
implements TxStatePartitionS
}
@Override
- public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
+ public void snapshotInfo(byte[] snapshotInfo) {
assertThreadAllowsToWrite();
- storage.snapshotInfo(snapshotInfo, index, term);
+ storage.snapshotInfo(snapshotInfo);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
index e6a1a9be279..6f62a8e517e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
@@ -248,7 +248,7 @@ public interface TxStatePartitionStorage extends
ManuallyCloseable {
/**
* Updates the current snapshot information.
*/
- void snapshotInfo(byte[] snapshotInfo, long index, long term);
+ void snapshotInfo(byte[] snapshotInfo);
/**
* Returns the current snapshot information of {@code null} if it was
never saved.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
index 7f89c09571f..85a13658cfb 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
@@ -580,12 +580,24 @@ public class TxStateRocksDbPartitionStorage implements
TxStatePartitionStorage {
}
@Override
- public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
- updateData(writeBatch -> {
- metaStorage.updateSnapshotInfo(writeBatch, snapshotInfo);
+ public void snapshotInfo(byte[] snapshotInfo) {
+ busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance();
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ metaStorage.updateSnapshotInfo(writeBatch, snapshotInfo);
+
+ sharedStorage.db().write(sharedStorage.writeOptions,
writeBatch);
+ } catch (RocksDBException e) {
+ throw new TxStateStorageException(
+ TX_STATE_STORAGE_ERR,
+ format("Failed to update data in the storage: [{}]",
createStorageInfo()),
+ e
+ );
+ }
return null;
- }, index, term);
+ });
}
@Override
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
index e642668fbde..6fb009432e2 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
@@ -523,11 +523,9 @@ public abstract class AbstractTxStatePartitionStorageTest
extends BaseIgniteAbst
assertThat(storage.lastAppliedIndex(), is(3L));
assertThat(storage.lastAppliedTerm(), is(50L));
- storage.snapshotInfo(SNAPSHOT_INFO, 4, 60);
+ storage.snapshotInfo(SNAPSHOT_INFO);
assertThat(storage.snapshotInfo(), is(SNAPSHOT_INFO));
- assertThat(storage.lastAppliedIndex(), is(4L));
- assertThat(storage.lastAppliedTerm(), is(60L));
}
@Test
@@ -595,7 +593,7 @@ public abstract class AbstractTxStatePartitionStorageTest
extends BaseIgniteAbst
);
assertThrowsStorageRebalanceException(
TX_STATE_STORAGE_REBALANCE_ERR,
- () -> storage.snapshotInfo(SNAPSHOT_INFO, 1, 2)
+ () -> storage.snapshotInfo(SNAPSHOT_INFO)
);
assertThrowsStorageRebalanceException(TX_STATE_STORAGE_REBALANCE_ERR,
() -> storage.get(UUID.randomUUID()));
assertThrowsStorageRebalanceException(TX_STATE_STORAGE_REBALANCE_ERR,
() -> storage.remove(UUID.randomUUID(), 100, 500));
@@ -679,7 +677,7 @@ public abstract class AbstractTxStatePartitionStorageTest
extends BaseIgniteAbst
storage.committedGroupConfiguration(GROUP_CONFIGURATION,
lastAppliedIndex + 1, lastAppliedTerm);
storage.leaseInfo(LEASE_INFO, lastAppliedIndex + 2, lastAppliedTerm);
- storage.snapshotInfo(SNAPSHOT_INFO, lastAppliedIndex + 3,
lastAppliedTerm);
+ storage.snapshotInfo(SNAPSHOT_INFO);
}
protected static void fillStorageDuringRebalance(TxStatePartitionStorage
storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
index 38b66e08831..5f3f2162289 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorage.java
@@ -317,13 +317,10 @@ public class TestTxStatePartitionStorage implements
TxStatePartitionStorage {
}
@Override
- public void snapshotInfo(byte[] snapshotInfo, long index, long term) {
+ public void snapshotInfo(byte[] snapshotInfo) {
checkStorageClosedOrInProgressOfRebalance();
this.snapshotInfo = snapshotInfo;
-
- lastAppliedIndex = index;
- lastAppliedTerm = term;
}
@Override