This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-19532
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 0afca2f44ffce8fa3b1392974c1d7fde6feca51f
Author: Semyon Danilov <[email protected]>
AuthorDate: Tue May 23 15:34:18 2023 +0400

    IGNITE-19532 Happens-before for safe time propagation
---
 .../ignite/internal/hlc/HybridTimestamp.java       |   7 +-
 .../impl/ItMetaStorageMultipleNodesTest.java       |  34 ++++---
 .../metastorage/impl/MetaStorageManagerImpl.java   |  11 ++-
 .../server/OnRevisionAppliedCallback.java          |   3 +-
 .../metastorage/server/WatchProcessor.java         |  16 +++-
 .../server/persistence/RocksDbKeyValueStorage.java | 103 ++++++++++++++++++---
 .../persistence/StorageColumnFamilyType.java       |   4 +-
 .../server/raft/MetaStorageWriteHandler.java       |   8 +-
 .../server/BasicOperationsKeyValueStorageTest.java |   8 +-
 .../server/RocksDbKeyValueStorageTest.java         |   2 +-
 .../metastorage/server/WatchProcessorTest.java     |  23 ++---
 .../server/SimpleInMemoryKeyValueStorage.java      |  17 ++--
 12 files changed, 171 insertions(+), 65 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 5cdcebdcd5..cfa133ed50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -82,7 +82,12 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
         }
     }
 
-    private HybridTimestamp(long time) {
+    /**
+     * The constructor.
+     *
+     * @param time Long time value.
+     */
+    public HybridTimestamp(long time) {
         this.time = time;
 
         // Negative time breaks comparison, we don't allow overflow of the 
physical time.
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index a817026ba3..68043ee720 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -72,9 +73,7 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -193,16 +192,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     .thenCompose(service -> 
service.refreshMembers(false).thenApply(v -> service.learners()))
                     .thenApply(learners -> 
learners.stream().map(Peer::consistentId).collect(toSet()));
         }
-
-        void startDroppingMessagesTo(Node recipient, Class<? extends 
NetworkMessage> msgType) {
-            ((DefaultMessagingService) clusterService.messagingService())
-                    .dropMessages((recipientConsistentId, message) ->
-                            recipient.name().equals(recipientConsistentId) && 
msgType.isInstance(message));
-        }
-
-        void stopDroppingMessages() {
-            ((DefaultMessagingService) 
clusterService.messagingService()).stopDroppingMessages();
-        }
     }
 
     private final List<Node> nodes = new ArrayList<>();
@@ -364,18 +353,37 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        CompletableFuture<Void> f = new CompletableFuture<>();
+        CountDownLatch l = new CountDownLatch(1);
+
+        
secondNode.metaStorageManager.registerExactWatch(ByteArray.fromString("test-key"),
 new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                l.countDown();
+                return f;
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                // No-op.
+            }
+        });
+
         // Try putting data from both nodes, because any of them can be a 
leader.
         assertThat(
                 
firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new 
byte[]{0, 1, 2, 3}),
                 willCompleteSuccessfully()
         );
 
+        assertTrue(l.await(1, TimeUnit.SECONDS));
+        f.complete(null);
+
         assertTrue(waitForCondition(() -> {
             HybridTimestamp sf1 = firstNodeTime.currentSafeTime();
             HybridTimestamp sf2 = secondNodeTime.currentSafeTime();
 
             return sf1.equals(sf2);
-        }, TimeUnit.SECONDS.toMillis(1)));
+        }, 100));
 
         assertThat(
                 
secondNode.metaStorageManager.put(ByteArray.fromString("test-key-2"), new 
byte[]{0, 1, 2, 3}),
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7bc432a27c..5bf665416d 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -150,7 +151,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig 
= new RaftNodeDisruptorConfiguration("metastorage", 1);
+            var ownFsmCallerExecutorDisruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
 
             // We need to configure the replication protocol differently 
whether this node is a synchronous or asynchronous replica.
             if (metaStorageNodes.contains(thisNodeName)) {
@@ -283,7 +284,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         try {
             // Meta Storage contract states that all updated entries under a 
particular revision must be stored in the Vault.
-            storage.startWatches(this::saveUpdatedEntriesToVault);
+            storage.startWatches(this::onRevisionApplied);
         } finally {
             busyLock.leaveBusy();
         }
@@ -599,13 +600,17 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /**
      * Saves processed Meta Storage revision and corresponding entries to the 
Vault.
      */
-    private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent 
watchEvent) {
+    private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, 
HybridTimestamp time) {
+        assert time != null;
+
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping applying MetaStorage revision because the node 
is stopping");
 
             return completedFuture(null);
         }
 
+        clusterTime.updateSafeTime(time);
+
         try {
             CompletableFuture<Void> saveToVaultFuture;
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 0ce5b39072..d3e20e328b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.server;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 
 /**
@@ -31,5 +32,5 @@ public interface OnRevisionAppliedCallback {
      * @param watchEvent Event with modified Meta Storage entries processed at 
least one Watch.
      * @return Future that represents the state of the execution of the 
callback.
      */
-    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, 
HybridTimestamp time);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index aac4330e29..234a14dd08 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -118,7 +119,10 @@ public class WatchProcessor implements ManuallyCloseable {
     /**
      * Notifies registered watch about an update event.
      */
-    public void notifyWatches(List<Entry> updatedEntries) {
+    @SuppressWarnings("unchecked")
+    public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp 
time) {
+        assert time != null;
+
         notificationFuture = notificationFuture
                 .thenComposeAsync(v -> {
                     // Revision must be the same for all entries.
@@ -130,7 +134,7 @@ public class WatchProcessor implements ManuallyCloseable {
                             .toArray(CompletableFuture[]::new);
 
                     return allOf(notificationFutures)
-                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision, time), 
watchExecutor);
                 }, watchExecutor);
     }
 
@@ -179,7 +183,11 @@ public class WatchProcessor implements ManuallyCloseable {
                 });
     }
 
-    private CompletableFuture<Void> 
invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[] 
notificationFutures, long revision) {
+    private CompletableFuture<Void> invokeOnRevisionCallback(
+            CompletableFuture<List<EntryEvent>>[] notificationFutures,
+            long revision,
+            HybridTimestamp time
+    ) {
         try {
             // Only notify about entries that have been accepted by at least 
one Watch.
             var acceptedEntries = new HashSet<EntryEvent>();
@@ -193,7 +201,7 @@ public class WatchProcessor implements ManuallyCloseable {
 
             var event = new WatchEvent(acceptedEntries, revision);
 
-            return revisionCallback.onRevisionApplied(event)
+            return revisionCallback.onRevisionApplied(event, time)
                     .whenComplete((ignored, e) -> {
                         if (e != null) {
                             LOG.error("Error occurred when notifying watches", 
e);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 0d439a0b24..cc667908e3 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
 import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
@@ -47,6 +48,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -156,6 +158,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     /** Timestamp to revision mapping column family. */
     private volatile ColumnFamily tsToRevision;
 
+    /** Revision to timestamp mapping column family. */
+    private volatile ColumnFamily revisionToTs;
+
     /** Snapshot manager. */
     private volatile RocksSnapshotManager snapshotManager;
 
@@ -197,14 +202,14 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
      * <p>Multi-threaded access is guarded by {@link #rwLock}.
      */
     @Nullable
-    private List<List<Entry>> eventCache;
+    private List<UpdatedEntries> eventCache;
 
     /**
      * Current list of updated entries.
      *
      * <p>Since this list gets read and updated only on writes (under a write 
lock), no extra synchronisation is needed.
      */
-    private final List<Entry> updatedEntries = new ArrayList<>();
+    private final UpdatedEntries updatedEntries = new UpdatedEntries();
 
     /**
      * Constructor.
@@ -242,10 +247,14 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         Options tsToRevOptions = new Options().setCreateIfMissing(true);
         ColumnFamilyOptions tsToRevFamilyOptions = new 
ColumnFamilyOptions(tsToRevOptions);
 
+        Options revToTsOptions = new Options().setCreateIfMissing(true);
+        ColumnFamilyOptions revToTsFamilyOptions = new 
ColumnFamilyOptions(revToTsOptions);
+
         return List.of(
                 new ColumnFamilyDescriptor(DATA.nameAsBytes(), 
dataFamilyOptions),
                 new ColumnFamilyDescriptor(INDEX.nameAsBytes(), 
indexFamilyOptions),
-                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), 
tsToRevFamilyOptions)
+                new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), 
tsToRevFamilyOptions),
+                new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(), 
revToTsFamilyOptions)
         );
     }
 
@@ -254,7 +263,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
-        assert descriptors.size() == 3;
+        assert descriptors.size() == 4;
 
         var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
 
@@ -270,8 +279,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         tsToRevision = ColumnFamily.wrap(db, handles.get(2));
 
+        revisionToTs = ColumnFamily.wrap(db, handles.get(3));
+
         snapshotManager = new RocksSnapshotManager(db,
-                List.of(fullRange(data), fullRange(index), 
fullRange(tsToRevision)),
+                List.of(fullRange(data), fullRange(index), 
fullRange(tsToRevision), fullRange(revisionToTs)),
                 snapshotExecutor
         );
     }
@@ -413,7 +424,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             data.put(batch, REVISION_KEY, revisionBytes);
 
             if (ts != null) {
-                tsToRevision.put(batch, hybridTsToArray(ts), revisionBytes);
+                byte[] tsBytes = hybridTsToArray(ts);
+
+                tsToRevision.put(batch, tsBytes, revisionBytes);
+                revisionToTs.put(batch, revisionBytes, tsBytes);
             }
 
             db.write(opts, batch);
@@ -422,6 +436,8 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
             updCntr = newCntr;
         }
 
+        updatedEntries.ts = ts;
+
         queueWatchEvent();
     }
 
@@ -1329,7 +1345,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     eventCache = new ArrayList<>();
                 }
 
-                eventCache.add(List.copyOf(updatedEntries));
+                eventCache.add(updatedEntries.copy());
 
                 updatedEntries.clear();
 
@@ -1343,7 +1359,11 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     private void notifyWatches() {
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        UpdatedEntries copy = updatedEntries.copy();
+
+        assert copy.ts != null;
+
+        watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
 
         updatedEntries.clear();
     }
@@ -1359,6 +1379,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
 
         var updatedEntries = new ArrayList<Entry>();
+        HybridTimestamp ts = null;
 
         try (
                 var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1379,7 +1400,9 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     if (!updatedEntries.isEmpty()) {
                         var updatedEntriesCopy = List.copyOf(updatedEntries);
 
-                        watchProcessor.notifyWatches(updatedEntriesCopy);
+                        assert ts != null;
+
+                        watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
                     }
@@ -1387,6 +1410,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                     lastSeenRevision = revision;
                 }
 
+                if (ts == null) {
+                    ts = timestampByRevision(revision);
+                }
+
                 updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, 
bytesToValue(rocksValue)));
             }
 
@@ -1394,13 +1421,27 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
             // Notify about the events left after finishing the cycle above.
             if (!updatedEntries.isEmpty()) {
-                watchProcessor.notifyWatches(updatedEntries);
+                assert ts != null;
+
+                watchProcessor.notifyWatches(updatedEntries, ts);
             }
         }
 
         finishReplay();
     }
 
+    private HybridTimestamp timestampByRevision(long revision) {
+        try {
+            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+            assert tsBytes != null;
+
+            return new HybridTimestamp(bytesToLong(tsBytes));
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private void finishReplay() {
         // Take the lock to drain the event cache and prevent new events from 
being cached. Since event notification is asynchronous,
         // this lock shouldn't be held for long.
@@ -1408,7 +1449,11 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         try {
             if (eventCache != null) {
-                eventCache.forEach(watchProcessor::notifyWatches);
+                eventCache.forEach(entries -> {
+                    assert entries.ts != null;
+
+                    watchProcessor.notifyWatches(entries.updatedEntries, 
entries.ts);
+                });
 
                 eventCache = null;
             }
@@ -1423,4 +1468,40 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     public Path getDbPath() {
         return dbPath;
     }
+
+    private static class UpdatedEntries {
+        private final List<Entry> updatedEntries;
+
+        @Nullable
+        private HybridTimestamp ts;
+
+        public UpdatedEntries() {
+            this.updatedEntries = new ArrayList<>();
+        }
+
+        private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) 
{
+            this.updatedEntries = updatedEntries;
+            this.ts = Objects.requireNonNull(ts);
+        }
+
+        public boolean isEmpty() {
+            return updatedEntries.isEmpty();
+        }
+
+        public boolean add(Entry entry) {
+            return updatedEntries.add(entry);
+        }
+
+        public void clear() {
+            updatedEntries.clear();
+
+            ts = null;
+        }
+
+        public UpdatedEntries copy() {
+            assert ts != null;
+
+            return new UpdatedEntries(new ArrayList<>(updatedEntries), ts);
+        }
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
index 958b73fb88..d1d92b99c9 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -31,7 +31,9 @@ enum StorageColumnFamilyType {
     INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
 
     /** Column family for the timestamp to revision mapping. */
-    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8));
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8));
 
     /** Byte representation of the column family's name. */
     private final byte[] nameAsBytes;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index ca80ffc46a..6354ece930 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -88,13 +88,9 @@ class MetaStorageWriteHandler {
                 safeTime = cmdWithTime.safeTime();
 
                 handleWriteWithTime(clo, cmdWithTime, safeTime);
-
-                // Every MetaStorageWriteCommand holds safe time that we 
should set as the cluster time.
-                clusterTime.updateSafeTime(safeTime);
             } else if (command instanceof SyncTimeCommand) {
-                clusterTime.updateSafeTime(((SyncTimeCommand) 
command).safeTime());
-
-                clo.result(null);
+                // TODO: IGNITE-19199 WatchProcessor must be notified of the 
new safe time.
+                throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199";);
             } else {
                 assert false : "Command was not found [cmd=" + command + ']';
             }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 27033f9685..fd2318d8ff 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2226,7 +2226,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         OnRevisionAppliedCallback mockCallback = 
mock(OnRevisionAppliedCallback.class);
 
-        
when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(mockCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
         storage.startWatches(mockCallback);
 
@@ -2238,7 +2238,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         verify(mockListener3, timeout(10_000)).onUpdate(any());
 
-        verify(mockCallback, never()).onRevisionApplied(any());
+        verify(mockCallback, never()).onRevisionApplied(any(), any());
     }
 
     @Test
@@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(event -> completedFuture(null));
+        storage.startWatches((event, ts) -> completedFuture(null));
 
         return resultFuture;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 31baeb0f31..ff02241d07 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
             }
         });
 
-        storage.startWatches(event -> CompletableFuture.completedFuture(null));
+        storage.startWatches((event, ts) -> 
CompletableFuture.completedFuture(null));
 
         storage.restoreSnapshot(snapshotPath);
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 1e64ecd803..346da4712a 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -55,7 +56,7 @@ public class WatchProcessorTest {
 
     @BeforeEach
     void setUp() {
-        
when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+        when(revisionCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
         watchProcessor.setRevisionCallback(revisionCallback);
     }
@@ -79,7 +80,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
         var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
@@ -92,7 +93,7 @@ public class WatchProcessorTest {
 
         var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
 
-        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
 
         WatchEvent event = watchEventCaptor.getValue();
 
@@ -114,23 +115,23 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1));
+        watchProcessor.notifyWatches(List.of(entry1), 
HybridTimestamp.MAX_VALUE);
 
         var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
 
         verify(listener1, timeout(1_000)).onUpdate(event);
         verify(listener2, timeout(1_000)).onRevisionUpdated(1);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, 
any());
 
-        watchProcessor.notifyWatches(List.of(entry2));
+        watchProcessor.notifyWatches(List.of(entry2), 
HybridTimestamp.MAX_VALUE);
 
         event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
 
         verify(listener1, timeout(1_000)).onRevisionUpdated(2);
         verify(listener2, timeout(1_000)).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, 
any());
     }
 
     /**
@@ -150,13 +151,13 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2, 
timeout(1_000)).onError(any(IllegalStateException.class));
 
-        verify(revisionCallback, never()).onRevisionApplied(any());
+        verify(revisionCallback, never()).onRevisionApplied(any(), any());
     }
 
     /**
@@ -182,7 +183,7 @@ public class WatchProcessorTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2));
+        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
@@ -190,7 +191,7 @@ public class WatchProcessorTest {
         var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0);
         var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry3, entry4));
+        watchProcessor.notifyWatches(List.of(entry3, entry4), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
         verify(listener2, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 2096e9ab4e..6f2c404191 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -64,6 +64,9 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
+    /** Revision to timestamp mapping. */
+    private final NavigableMap<Long, Long> revToTsMap = new TreeMap<>();
+
     /** Revisions index. Value contains all entries which were modified under 
particular revision. */
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new 
TreeMap<>();
 
@@ -120,6 +123,7 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         rev = newRevision;
 
         tsToRevMap.put(ts.longValue(), rev);
+        revToTsMap.put(rev, ts.longValue());
 
         notifyWatches();
     }
@@ -468,7 +472,10 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
             return;
         }
 
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+        Long tsLong = revToTsMap.get(updatedEntries.get(0).revision());
+        assert tsLong != null;
+
+        watchProcessor.notifyWatches(List.copyOf(updatedEntries), new 
HybridTimestamp(tsLong));
 
         updatedEntries.clear();
     }
@@ -748,12 +755,4 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     private static long lastRevision(List<Long> revs) {
         return revs.get(revs.size() - 1);
     }
-
-    private static List<Long> listOf(long val) {
-        List<Long> res = new ArrayList<>();
-
-        res.add(val);
-
-        return res;
-    }
 }

Reply via email to