This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dbf54b9e76 IGNITE-23419 Advance Safe Time in 
KeyValueStorage#saveCompactionRevision (#4575)
dbf54b9e76 is described below

commit dbf54b9e76f39dfc3e7621bbfde40c14379f8bb5
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Oct 17 07:27:02 2024 +0300

    IGNITE-23419 Advance Safe Time in KeyValueStorage#saveCompactionRevision 
(#4575)
---
 .../metastorage/MetaStorageCompactionManager.java  | 22 ---------
 ...tMetaStorageSafeTimePropagationRocksDbTest.java |  2 +-
 ...torageSafeTimePropagationSimpleStorageTest.java |  2 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  5 --
 .../metastorage/server/KeyValueStorage.java        |  9 ++--
 .../server/persistence/RocksDbKeyValueStorage.java | 14 +++++-
 .../AbstractCompactionKeyValueStorageTest.java     | 55 +++++++++++++++++++---
 .../RocksDbCompactionKeyValueStorageTest.java      |  2 +-
 .../server/RocksDbKeyValueStorageTest.java         |  2 +-
 ...impleInMemoryCompactionKeyValueStorageTest.java |  2 +-
 .../server/SimpleInMemoryKeyValueStorageTest.java  |  2 +-
 .../server/AbstractKeyValueStorageTest.java        |  2 +
 .../server/SimpleInMemoryKeyValueStorage.java      | 28 +++++++++--
 .../SimpleInMemoryKeyValueStorageSnapshot.java     | 10 +++-
 14 files changed, 107 insertions(+), 50 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
index eece4bf9ba..192f7eebdc 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageCompactionManager.java
@@ -58,30 +58,10 @@ public interface MetaStorageCompactionManager extends 
IgniteComponent {
      * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
      * @throws MetaStorageException If there is an error during the 
metastorage compaction process.
      * @see #setCompactionRevisionLocally(long)
-     * @see #saveCompactionRevisionLocally(long)
      * @see #getCompactionRevisionLocally()
      */
     void compactLocally(long revision);
 
-    /**
-     * Saves the compaction revision to the metastorage meta locally.
-     *
-     * <p>Method only saves the new compaction revision to the meta of 
metastorage. After invoking this method the metastorage read methods
-     * will <b>not</b> immediately start throwing a {@link CompactedException} 
if they request a revision less than or equal to the new
-     * saved one.</p>
-     *
-     * <p>Last saved compaction revision will be in the metastorage snapshot. 
When restore from a snapshot, compaction revision will be
-     * restored, after which the metastorage read methods will throw exception 
{@link CompactedException}.</p>
-     *
-     * <p>Compaction revision is expected to be less than the current 
metastorage revision.</p>
-     *
-     * @param revision Compaction revision to save.
-     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
-     * @throws MetaStorageException If there is an error while saving a 
compaction revision.
-     * @see #setCompactionRevisionLocally(long)
-     */
-    void saveCompactionRevisionLocally(long revision);
-
     /**
      * Sets the compaction revision locally, but does not save it, after 
invoking this method the metastorage read methods will throw a
      * {@link CompactedException} if they request a revision less than or 
equal to the new one.
@@ -90,7 +70,6 @@ public interface MetaStorageCompactionManager extends 
IgniteComponent {
      *
      * @param revision Compaction revision.
      * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
-     * @see #saveCompactionRevisionLocally(long)
      */
     void setCompactionRevisionLocally(long revision);
 
@@ -99,7 +78,6 @@ public interface MetaStorageCompactionManager extends 
IgniteComponent {
      *
      * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if the node is in the process of stopping.
      * @see #setCompactionRevisionLocally(long)
-     * @see #saveCompactionRevisionLocally(long)
      */
     long getCompactionRevisionLocally();
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index c13fc5bc25..52b4f80510 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -33,6 +33,6 @@ public class ItMetaStorageSafeTimePropagationRocksDbTest 
extends ItMetaStorageSa
 
     @Override
     public KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage("test", workDir, new 
NoOpFailureManager());
+        return new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager());
     }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
index d412c6565e..543442237e 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
@@ -24,6 +24,6 @@ import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
 public class ItMetaStorageSafeTimePropagationSimpleStorageTest extends 
ItMetaStorageSafeTimePropagationAbstractTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new SimpleInMemoryKeyValueStorage("test");
+        return new SimpleInMemoryKeyValueStorage(NODE_NAME);
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 19c98a5c42..580dcb1f44 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -1126,11 +1126,6 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
         inBusyLock(busyLock, () -> storage.compact(revision));
     }
 
-    @Override
-    public void saveCompactionRevisionLocally(long revision) {
-        inBusyLock(busyLock, () -> storage.saveCompactionRevision(revision));
-    }
-
     @Override
     public void setCompactionRevisionLocally(long revision) {
         inBusyLock(busyLock, () -> storage.setCompactionRevision(revision));
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 51e4cd27c9..3ca0f9f5fa 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -420,7 +420,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * @throws MetaStorageException If there is an error during the 
metastorage compaction process.
      * @see #stopCompaction()
      * @see #setCompactionRevision(long)
-     * @see #saveCompactionRevision(long)
+     * @see #saveCompactionRevision(long, KeyValueUpdateContext)
      */
     void compact(long revision);
 
@@ -513,10 +513,11 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
      * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
      *
      * @param revision Compaction revision to save.
+     * @param context Operation's context.
      * @throws MetaStorageException If there is an error while saving a 
compaction revision.
      * @see #setCompactionRevision(long)
      */
-    void saveCompactionRevision(long revision);
+    void saveCompactionRevision(long revision, KeyValueUpdateContext context);
 
     /**
      * Sets the compaction revision, but does not save it, after invoking this 
method the metastorage read methods will throw a
@@ -525,7 +526,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * <p>Compaction revision is expected to be less than the {@link #revision 
current storage revision}.</p>
      *
      * @param revision Compaction revision.
-     * @see #saveCompactionRevision(long)
+     * @see #saveCompactionRevision(long, KeyValueUpdateContext)
      */
     void setCompactionRevision(long revision);
 
@@ -533,7 +534,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * Returns the compaction revision that was set or restored from a 
snapshot, {@code -1} if not changed.
      *
      * @see #setCompactionRevision(long)
-     * @see #saveCompactionRevision(long)
+     * @see #saveCompactionRevision(long, KeyValueUpdateContext)
      */
     long getCompactionRevision();
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 565a5041fa..cb49b034e1 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -590,7 +590,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         validateNoChecksumConflict(newRev, newChecksum);
         revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum));
 
-        data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index, 
context.term));
+        addIndexAndTermToWriteBatch(batch, context);
 
         db.write(defaultWriteOptions, batch);
 
@@ -1264,7 +1264,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void saveCompactionRevision(long revision) {
+    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
         assert revision >= 0 : revision;
 
         rwLock.writeLock().lock();
@@ -1274,7 +1274,13 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             data.put(batch, COMPACTION_REVISION_KEY, longToBytes(revision));
 
+            addIndexAndTermToWriteBatch(batch, context);
+
             db.write(defaultWriteOptions, batch);
+
+            if (recoveryStatus.get() == RecoveryStatus.DONE) {
+                watchProcessor.advanceSafeTime(context.timestamp);
+            }
         } catch (Throwable t) {
             throw new MetaStorageException(COMPACTION_ERR, "Error saving 
compaction revision: " + revision, t);
         } finally {
@@ -1529,4 +1535,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             }
         };
     }
+
+    private void addIndexAndTermToWriteBatch(WriteBatch batch, 
KeyValueUpdateContext context) throws RocksDBException {
+        data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index, 
context.term));
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index cadcee46c1..c549271e4b 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -26,6 +26,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,9 +46,13 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -70,6 +75,8 @@ public abstract class AbstractCompactionKeyValueStorageTest 
extends AbstractKeyV
 
     private final HybridClock clock = new HybridClockImpl();
 
+    private final ClusterTimeImpl clusterTime = new ClusterTimeImpl(NODE_NAME, 
new IgniteSpinBusyLock(), clock);
+
     @Override
     @BeforeEach
     void setUp() {
@@ -109,6 +116,12 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         assertEquals(List.of(4, 6/* Tombstone */), collectRevisions(SOME_KEY));
     }
 
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {
+        closeAllManually(super::tearDown, clusterTime);
+    }
+
     /**
      * Tests {@link KeyValueStorage#compact(long)} for a specific single 
revision, to simplify testing, see examples in the method
      * description. Keys with their revisions are added in {@link #setUp()}.
@@ -255,47 +268,64 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
     }
 
     @Test
-    void testSaveCompactionRevisionDoesNotChangeRevisionInMemory() {
-        storage.saveCompactionRevision(0);
+    void testSaveCompactionRevision() {
+        HybridTimestamp now0 = clock.now();
+        HybridTimestamp now1 = clock.now();
+
+        startListenUpdateSafeTime();
+
+        storage.saveCompactionRevision(0, new KeyValueUpdateContext(1, 1, 
now0));
         assertEquals(-1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
+        assertThat(clusterTime.waitFor(now0), willCompleteSuccessfully());
 
-        storage.saveCompactionRevision(1);
+        storage.saveCompactionRevision(1, new KeyValueUpdateContext(2, 2, 
now1));
         assertEquals(-1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(2, 2), storage.getIndexWithTerm());
+        assertThat(clusterTime.waitFor(now1), willCompleteSuccessfully());
     }
 
     @Test
     void testSaveCompactionRevisionAndRestart() throws Exception {
-        storage.saveCompactionRevision(1);
+        storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1, 
clock.now()));
 
         restartStorage();
 
+        // No safe time check as it is only saved with revision update.
         assertEquals(1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
     }
 
     @Test
     void testSaveCompactionRevisionInSnapshot() {
-        storage.saveCompactionRevision(1);
+        storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1, 
clock.now()));
 
         Path snapshotDir = workDir.resolve("snapshot");
 
+        // No safe time check as it is only saved with revision update.
         assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
         assertEquals(-1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
 
+        // No safe time check as it is only saved with revision update.
         storage.restoreSnapshot(snapshotDir);
         assertEquals(1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
     }
 
     @Test
     void testSaveCompactionRevisionInSnapshotAndRestart() throws Exception {
-        storage.saveCompactionRevision(1);
+        storage.saveCompactionRevision(1, new KeyValueUpdateContext(1, 1, 
clock.now()));
 
         Path snapshotDir = workDir.resolve("snapshot");
         assertThat(storage.snapshot(snapshotDir), willCompleteSuccessfully());
 
         restartStorage();
 
+        // No safe time check as it is only saved with revision update.
         storage.restoreSnapshot(snapshotDir);
         assertEquals(1, storage.getCompactionRevision());
+        assertEquals(new IndexWithTerm(1, 1), storage.getIndexWithTerm());
     }
 
     @Test
@@ -977,6 +1007,19 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         );
     }
 
+    private void startListenUpdateSafeTime() {
+        storage.startWatches(storage.revision() + 1, new 
OnRevisionAppliedCallback() {
+            @Override
+            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+                clusterTime.updateSafeTime(newSafeTime);
+            }
+
+            @Override
+            public void onRevisionApplied(long revision) {
+            }
+        });
+    }
+
     private static String toUtf8String(byte[]... keys) {
         return Arrays.stream(keys)
                 .map(KeyValueStorageUtils::toUtf8String)
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index 84352b7dc8..05d3562850 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
 public class RocksDbCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage("test", workDir.resolve("storage"), 
new NoOpFailureManager());
+        return new RocksDbKeyValueStorage(NODE_NAME, 
workDir.resolve("storage"), new NoOpFailureManager());
     }
 
     @Test
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 10319f31d8..c8962f37de 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -49,7 +49,7 @@ import org.junit.jupiter.api.Test;
 public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage("test", workDir.resolve("storage"), 
new NoOpFailureManager());
+        return new RocksDbKeyValueStorage(NODE_NAME, 
workDir.resolve("storage"), new NoOpFailureManager());
     }
 
     @Test
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
index 529a08569f..aa9a4c93be 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.metastorage.server;
 public class SimpleInMemoryCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new SimpleInMemoryKeyValueStorage("test");
+        return new SimpleInMemoryKeyValueStorage(NODE_NAME);
     }
 
     @Override
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index f27b4358e4..b06b2834ad 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -23,7 +23,7 @@ package org.apache.ignite.internal.metastorage.server;
 class SimpleInMemoryKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTest {
     @Override
     public KeyValueStorage createStorage() {
-        return new SimpleInMemoryKeyValueStorage("test");
+        return new SimpleInMemoryKeyValueStorage(NODE_NAME);
     }
 
     @Override
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 3f0cd7d5b0..3f362d3f1d 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.BeforeEach;
  * Abstract test for {@link KeyValueStorage}.
  */
 public abstract class AbstractKeyValueStorageTest extends 
BaseIgniteAbstractTest {
+    protected static final String NODE_NAME = "test";
+
     protected KeyValueStorage storage;
 
     @BeforeEach
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 097fbd61c5..4ec9d8c091 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -104,10 +104,18 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
      */
     private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = 
new ConcurrentSkipListMap<>();
 
-    /** Last update index. */
+    /**
+     * Last update index.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
     private long index;
 
-    /** Last update term. */
+    /**
+     * Last update term.
+     *
+     * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+     */
     private long term;
 
     /** Last saved configuration. */
@@ -574,7 +582,9 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
                     Map.copyOf(revToTsMap),
                     revsIdxCopy,
                     rev,
-                    savedCompactionRevision
+                    savedCompactionRevision,
+                    term,
+                    index
             );
 
             byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
@@ -620,6 +630,8 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
             rev = snapshot.rev;
             compactionRevision = snapshot.savedCompactionRevision;
             savedCompactionRevision = snapshot.savedCompactionRevision;
+            term = snapshot.term;
+            index = snapshot.index;
         } catch (Throwable t) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
         } finally {
@@ -751,7 +763,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     @Override
-    public void saveCompactionRevision(long revision) {
+    public void saveCompactionRevision(long revision, KeyValueUpdateContext 
context) {
         assert revision >= 0 : revision;
 
         rwLock.writeLock().lock();
@@ -760,6 +772,14 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
             assertCompactionRevisionLessThanCurrent(revision, rev);
 
             savedCompactionRevision = revision;
+
+            setIndexAndTerm(context.index, context.term);
+
+            if (!areWatchesEnabled) {
+                return;
+            }
+
+            watchProcessor.advanceSafeTime(context.timestamp);
         } finally {
             rwLock.writeLock().unlock();
         }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
index 040ca11c24..ffa870ea6b 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageSnapshot.java
@@ -46,13 +46,19 @@ class SimpleInMemoryKeyValueStorageSnapshot implements 
Serializable {
 
     final long savedCompactionRevision;
 
+    final long term;
+
+    final long index;
+
     SimpleInMemoryKeyValueStorageSnapshot(
             Map<byte[], List<Long>> keysIdx,
             Map<Long, Long> tsToRevMap,
             Map<Long, HybridTimestamp> revToTsMap,
             Map<Long, Map<byte[], ValueSnapshot>> revsIdx,
             long rev,
-            long savedCompactionRevision
+            long savedCompactionRevision,
+            long term,
+            long index
     ) {
         this.keysIdx = keysIdx;
         this.tsToRevMap = tsToRevMap;
@@ -60,5 +66,7 @@ class SimpleInMemoryKeyValueStorageSnapshot implements 
Serializable {
         this.revsIdx = revsIdx;
         this.rev = rev;
         this.savedCompactionRevision = savedCompactionRevision;
+        this.term = term;
+        this.index = index;
     }
 }

Reply via email to