This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dbf54b9e76 IGNITE-23419 Advance Safe Time in
KeyValueStorage#saveCompactionRevision (#4575)
dbf54b9e76 is described below
commit dbf54b9e76f39dfc3e7621bbfde40c14379f8bb5
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Oct 17 07:27:02 2024 +0300
IGNITE-23419 Advance Safe Time in KeyValueStorage#saveCompactionRevision
(#4575)
---
.../metastorage/MetaStorageCompactionManager.java | 22 ---------
...tMetaStorageSafeTimePropagationRocksDbTest.java | 2 +-
...torageSafeTimePropagationSimpleStorageTest.java | 2 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 5 --
.../metastorage/server/KeyValueStorage.java | 9 ++--
.../server/persistence/RocksDbKeyValueStorage.java | 14 +++++-
.../AbstractCompactionKeyValueStorageTest.java | 55 +++++++++++++++++++---
.../RocksDbCompactionKeyValueStorageTest.java | 2 +-
.../server/RocksDbKeyValueStorageTest.java | 2 +-
...impleInMemoryCompactionKeyValueStorageTest.java | 2 +-
.../server/SimpleInMemoryKeyValueStorageTest.java | 2 +-
.../server/AbstractKeyValueStorageTest.java | 2 +
.../server/SimpleInMemoryKeyValueStorage.java | 28 +++++++++--
.../SimpleInMemoryKeyValueStorageSnapshot.java | 10 +++-
14 files changed, 107 insertions(+), 50 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
index eece4bf9ba..192f7eebdc 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
@@ -58,30 +58,10 @@ public interface MetaStorageCompactionManager extends
IgniteComponent {
* @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
* @throws MetaStorageException If there is an error during the
metastorage compaction process.
* @see #setCompactionRevisionLocally(long)
- * @see #saveCompactionRevisionLocally(long)
* @see #getCompactionRevisionLocally()
*/
void compactLocally(long revision);
- /**
- * Saves the compaction revision to the metastorage meta locally.
- *
- * <p>Method only saves the new compaction revision to the meta of
metastorage. After invoking this method the metastorage read methods
- * will <b>not</b> immediately start throwing a {@link CompactedException}
if they request a revision less than or equal to the new
- * saved one.</p>
- *
- * <p>Last saved compaction revision will be in the metastorage snapshot.
When restore from a snapshot, compaction revision will be
- * restored, after which the metastorage read methods will throw exception
{@link CompactedException}.</p>
- *
- * <p>Compaction revision is expected to be less than the current
metastorage revision.</p>
- *
- * @param revision Compaction revision to save.
- * @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
- * @throws MetaStorageException If there is an error while saving a
compaction revision.
- * @see #setCompactionRevisionLocally(long)
- */
- void saveCompactionRevisionLocally(long revision);
-
/**
* Sets the compaction revision locally, but does not save it, after
invoking this method the metastorage read methods will throw a
* {@link CompactedException} if they request a revision less than or
equal to the new one.
@@ -90,7 +70,6 @@ public interface MetaStorageCompactionManager extends
IgniteComponent {
*
* @param revision Compaction revision.
* @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
- * @see #saveCompactionRevisionLocally(long)
*/
void setCompactionRevisionLocally(long revision);
@@ -99,7 +78,6 @@ public interface MetaStorageCompactionManager extends
IgniteComponent {
*
* @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
* @see #setCompactionRevisionLocally(long)
- * @see #saveCompactionRevisionLocally(long)
*/
long getCompactionRevisionLocally();
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index c13fc5bc25..52b4f80510 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -33,6 +33,6 @@ public class ItMetaStorageSafeTimePropagationRocksDbTest
extends ItMetaStorageSa
@Override
public KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage("test", workDir, new
NoOpFailureManager());
+ return new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager());
}
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
index d412c6565e..543442237e 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
@@ -24,6 +24,6 @@ import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
public class ItMetaStorageSafeTimePropagationSimpleStorageTest extends
ItMetaStorageSafeTimePropagationAbstractTest {
@Override
public KeyValueStorage createStorage() {
- return new SimpleInMemoryKeyValueStorage("test");
+ return new SimpleInMemoryKeyValueStorage(NODE_NAME);
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 19c98a5c42..580dcb1f44 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -1126,11 +1126,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
inBusyLock(busyLock, () -> storage.compact(revision));
}
- @Override
- public void saveCompactionRevisionLocally(long revision) {
- inBusyLock(busyLock, () -> storage.saveCompactionRevision(revision));
- }
-
@Override
public void setCompactionRevisionLocally(long revision) {
inBusyLock(busyLock, () -> storage.setCompactionRevision(revision));
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 51e4cd27c9..3ca0f9f5fa 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -420,7 +420,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* @throws MetaStorageException If there is an error during the
metastorage compaction process.
* @see #stopCompaction()
* @see #setCompactionRevision(long)
- * @see #saveCompactionRevision(long)
+ * @see #saveCompactionRevision(long, KeyValueUpdateContext)
*/
void compact(long revision);
@@ -513,10 +513,11 @@ public interface KeyValueStorage extends
ManuallyCloseable {
* <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
*
* @param revision Compaction revision to save.
+ * @param context Operation's context.
* @throws MetaStorageException If there is an error while saving a
compaction revision.
* @see #setCompactionRevision(long)
*/
- void saveCompactionRevision(long revision);
+ void saveCompactionRevision(long revision, KeyValueUpdateContext context);
/**
* Sets the compaction revision, but does not save it, after invoking this
method the metastorage read methods will throw a
@@ -525,7 +526,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
*
* @param revision Compaction revision.
- * @see #saveCompactionRevision(long)
+ * @see #saveCompactionRevision(long, KeyValueUpdateContext)
*/
void setCompactionRevision(long revision);
@@ -533,7 +534,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* Returns the compaction revision that was set or restored from a
snapshot, {@code -1} if not changed.
*
* @see #setCompactionRevision(long)
- * @see #saveCompactionRevision(long)
+ * @see #saveCompactionRevision(long, KeyValueUpdateContext)
*/
long getCompactionRevision();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 565a5041fa..cb49b034e1 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -590,7 +590,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
validateNoChecksumConflict(newRev, newChecksum);
revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum));
- data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index,
context.term));
+ addIndexAndTermToWriteBatch(batch, context);
db.write(defaultWriteOptions, batch);
@@ -1264,7 +1264,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void saveCompactionRevision(long revision) {
+ public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
assert revision >= 0 : revision;
rwLock.writeLock().lock();
@@ -1274,7 +1274,13 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
data.put(batch, COMPACTION_REVISION_KEY, longToBytes(revision));
+ addIndexAndTermToWriteBatch(batch, context);
+
db.write(defaultWriteOptions, batch);
+
+ if (recoveryStatus.get() == RecoveryStatus.DONE) {
+ watchProcessor.advanceSafeTime(context.timestamp);
+ }
} catch (Throwable t) {
throw new MetaStorageException(COMPACTION_ERR, "Error saving
compaction revision: " + revision, t);
} finally {
@@ -1529,4 +1535,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
};
}
+
+ private void addIndexAndTermToWriteBatch(WriteBatch batch,
KeyValueUpdateContext context) throws RocksDBException {
+ data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index,
context.term));
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index cadcee46c1..c549271e4b 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -26,6 +26,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,9 +46,13 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -70,6 +75,8 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
private final HybridClock clock = new HybridClockImpl();
+ private final ClusterTimeImpl clusterTime = new ClusterTimeImpl(NODE_NAME,
new IgniteSpinBusyLock(), clock);
+
@Override
@BeforeEach
void setUp() {
@@ -109,6 +116,12 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(4, 6/* Tombstone */), collectRevisions(SOME_KEY));
}
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {
+ closeAllManually(super::tearDown, clusterTime);
+ }
+
/**
* Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
* description. Keys with their revisions are added in {@link #setUp()}.
@@ -255,47 +268,64 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
}
@Test
- void testSaveCompactionRevisionDoesNotChangeRevisionInMemory() {
- storage.saveCompactionRevision(0);
+ void testSaveCompactionRevision() {
+ HybridTimestamp now0 = clock.now();
+ HybridTimestamp now1 = clock.now();
+
+ startListenUpdateSafeTime();
+
+ storage.saveCompactionRevision(0, new KeyValueUpdateContext(1, 1,
now0));
assertEquals(-1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
+ assertThat(clusterTime.waitFor(now0), willCompleteSuccessfully());
- storage.saveCompactionRevision(1);
+ storage.saveCompactionRevision(1, new KeyValueUpdateContext(2, 2,
now1));
assertEquals(-1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(2, 2), storage.getIndexWithTerm());
+ assertThat(clusterTime.waitFor(now1), willCompleteSuccessfully());
}
@Test
void testSaveCompactionRevisionAndRestart() throws Exception {
- storage.saveCompactionRevision(1);
+ storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1,
clock.now()));
restartStorage();
+ // No safe time check as it is only saved with revision update.
assertEquals(1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
}
@Test
void testSaveCompactionRevisionInSnapshot() {
- storage.saveCompactionRevision(1);
+ storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1,
clock.now()));
Path snapshotDir = workDir.resolve("snapshot");
+ // No safe time check as it is only saved with revision update.
assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
assertEquals(-1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
+ // No safe time check as it is only saved with revision update.
storage.restoreSnapshot(snapshotDir);
assertEquals(1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
}
@Test
void testSaveCompactionRevisionInSnapshotAndRestart() throws Exception {
- storage.saveCompactionRevision(1);
+ storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1,
clock.now()));
Path snapshotDir = workDir.resolve("snapshot");
assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
restartStorage();
+ // No safe time check as it is only saved with revision update.
storage.restoreSnapshot(snapshotDir);
assertEquals(1, storage.getCompactionRevision());
+ assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
}
@Test
@@ -977,6 +1007,19 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
);
}
+ private void startListenUpdateSafeTime() {
+ storage.startWatches(storage.revision() + 1, new
OnRevisionAppliedCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ clusterTime.updateSafeTime(newSafeTime);
+ }
+
+ @Override
+ public void onRevisionApplied(long revision) {
+ }
+ });
+ }
+
private static String toUtf8String(byte[]... keys) {
return Arrays.stream(keys)
.map(KeyValueStorageUtils::toUtf8String)
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index 84352b7dc8..05d3562850 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
@Override
public KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage("test", workDir.resolve("storage"),
new NoOpFailureManager());
+ return new RocksDbKeyValueStorage(NODE_NAME,
workDir.resolve("storage"), new NoOpFailureManager());
}
@Test
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 10319f31d8..c8962f37de 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -49,7 +49,7 @@ import org.junit.jupiter.api.Test;
public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTest {
@Override
public KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage("test", workDir.resolve("storage"),
new NoOpFailureManager());
+ return new RocksDbKeyValueStorage(NODE_NAME,
workDir.resolve("storage"), new NoOpFailureManager());
}
@Test
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
index 529a08569f..aa9a4c93be 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.metastorage.server;
public class SimpleInMemoryCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
@Override
public KeyValueStorage createStorage() {
- return new SimpleInMemoryKeyValueStorage("test");
+ return new SimpleInMemoryKeyValueStorage(NODE_NAME);
}
@Override
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index f27b4358e4..b06b2834ad 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -23,7 +23,7 @@ package org.apache.ignite.internal.metastorage.server;
class SimpleInMemoryKeyValueStorageTest extends
BasicOperationsKeyValueStorageTest {
@Override
public KeyValueStorage createStorage() {
- return new SimpleInMemoryKeyValueStorage("test");
+ return new SimpleInMemoryKeyValueStorage(NODE_NAME);
}
@Override
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 3f0cd7d5b0..3f362d3f1d 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.BeforeEach;
* Abstract test for {@link KeyValueStorage}.
*/
public abstract class AbstractKeyValueStorageTest extends
BaseIgniteAbstractTest {
+ protected static final String NODE_NAME = "test";
+
protected KeyValueStorage storage;
@BeforeEach
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 097fbd61c5..4ec9d8c091 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -104,10 +104,18 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
*/
private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx =
new ConcurrentSkipListMap<>();
- /** Last update index. */
+ /**
+ * Last update index.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
private long index;
- /** Last update term. */
+ /**
+ * Last update term.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
private long term;
/** Last saved configuration. */
@@ -574,7 +582,9 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
Map.copyOf(revToTsMap),
revsIdxCopy,
rev,
- savedCompactionRevision
+ savedCompactionRevision,
+ term,
+ index
);
byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
@@ -620,6 +630,8 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
rev = snapshot.rev;
compactionRevision = snapshot.savedCompactionRevision;
savedCompactionRevision = snapshot.savedCompactionRevision;
+ term = snapshot.term;
+ index = snapshot.index;
} catch (Throwable t) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
} finally {
@@ -751,7 +763,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void saveCompactionRevision(long revision) {
+ public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
assert revision >= 0 : revision;
rwLock.writeLock().lock();
@@ -760,6 +772,14 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
assertCompactionRevisionLessThanCurrent(revision, rev);
savedCompactionRevision = revision;
+
+ setIndexAndTerm(context.index, context.term);
+
+ if (!areWatchesEnabled) {
+ return;
+ }
+
+ watchProcessor.advanceSafeTime(context.timestamp);
} finally {
rwLock.writeLock().unlock();
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
index 040ca11c24..ffa870ea6b 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
@@ -46,13 +46,19 @@ class SimpleInMemoryKeyValueStorageSnapshot implements
Serializable {
final long savedCompactionRevision;
+ final long term;
+
+ final long index;
+
SimpleInMemoryKeyValueStorageSnapshot(
Map<byte[], List<Long>> keysIdx,
Map<Long, Long> tsToRevMap,
Map<Long, HybridTimestamp> revToTsMap,
Map<Long, Map<byte[], ValueSnapshot>> revsIdx,
long rev,
- long savedCompactionRevision
+ long savedCompactionRevision,
+ long term,
+ long index
) {
this.keysIdx = keysIdx;
this.tsToRevMap = tsToRevMap;
@@ -60,5 +66,7 @@ class SimpleInMemoryKeyValueStorageSnapshot implements
Serializable {
this.revsIdx = revsIdx;
this.rev = rev;
this.savedCompactionRevision = savedCompactionRevision;
+ this.term = term;
+ this.index = index;
}
}