This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 02d17e9bac IGNITE-23847 Add metastorage revision update notification 
for operations that do not update metastorage data (#4839)
02d17e9bac is described below

commit 02d17e9bac6f162546182bd222b0f78f599aceed
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Dec 5 18:06:24 2024 +0300

    IGNITE-23847 Add metastorage revision update notification for operations 
that do not update metastorage data (#4839)
---
 .../internal/metastorage/server/Condition.java     |   5 +-
 .../server/UpdateOnlyRevisionEvent.java            |  52 ++++++++
 .../metastorage/server/WatchProcessor.java         |  20 ++-
 .../server/persistence/RocksDbKeyValueStorage.java |  72 +++++++---
 .../server/BasicOperationsKeyValueStorageTest.java |  60 +++++++--
 .../server/TestRevisionUpdateListener.java}        |  39 +++---
 .../server/TestWatchEventHandlingCallback.java}    |  34 ++---
 .../server/UpdateRevisionOperation.java            | 147 +++++++++++++++++++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  25 +++-
 9 files changed, 365 insertions(+), 89 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
index 36066d9a37..4d73cac0d9 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 
 /**
  * Defines interface for boolean condition which could be applied to an array 
of entries.
  *
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
+ * @see KeyValueStorage#invoke(Condition, List, List, KeyValueUpdateContext, 
CommandId)
  */
 public interface Condition {
     /**
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
new file mode 100644
index 0000000000..3d275d2778
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifies the {@link WatchProcessor} to update the revision without 
updating the {@link Entry entries}. */
+public class UpdateOnlyRevisionEvent implements NotifyWatchProcessorEvent {
+    private final long newRevision;
+
+    @IgniteToStringInclude
+    private final HybridTimestamp timestamp;
+
+    /** Constructor. */
+    public UpdateOnlyRevisionEvent(long newRevision, HybridTimestamp 
timestamp) {
+        this.newRevision = newRevision;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public void notify(WatchProcessor watchProcessor) {
+        watchProcessor.updateOnlyRevision(newRevision, timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index b6e0975cfc..03c02e99b0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -421,10 +421,20 @@ public class WatchProcessor implements ManuallyCloseable {
 
                     watchEventHandlingCallback.onSafeTimeAdvanced(time);
                 }, watchExecutor)
-                .whenComplete((ignored, e) -> {
-                    if (e != null) {
-                        failureManager.process(new 
FailureContext(CRITICAL_ERROR, e));
-                    }
-                });
+                .whenComplete((ignored, e) -> 
notifyFailureHandlerOnFirstFailureInNotificationChain(e));
+    }
+
+    /**
+     * Updates the metastorage revision in the WatchEvent queue. It should be 
used for those cases when the revision has been updated but
+     * no {@link Entry}s have been updated.
+     *
+     * @param newRevision New metastorage revision.
+     * @param time Metastorage revision update timestamp.
+     */
+    void updateOnlyRevision(long newRevision, HybridTimestamp time) {
+        notificationFuture = notificationFuture
+                .thenComposeAsync(unused -> 
notifyUpdateRevisionListeners(newRevision), watchExecutor)
+                .thenRunAsync(() -> invokeOnRevisionCallback(newRevision, 
time), watchExecutor)
+                .whenComplete((ignored, e) -> 
notifyFailureHandlerOnFirstFailureInNotificationChain(e));
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 4e26de9ae7..bb9dd8dc4b 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -98,6 +98,7 @@ import 
org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import org.apache.ignite.internal.metastorage.server.Statement;
 import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
+import org.apache.ignite.internal.metastorage.server.UpdateOnlyRevisionEvent;
 import org.apache.ignite.internal.metastorage.server.Value;
 import 
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
 import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -978,12 +979,15 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         }
 
         if (currentRevision != 0) {
-            Set<NotifyWatchProcessorEvent> fromStorage = 
collectNotifyWatchProcessorEventsFromStorage(startRevision, currentRevision);
+            Set<UpdateEntriesEvent> updateEntriesEvents = 
collectUpdateEntriesEventsFromStorage(startRevision, currentRevision);
+            Set<UpdateOnlyRevisionEvent> updateOnlyRevisionEvents = 
collectUpdateRevisionEventsFromStorage(startRevision, currentRevision);
 
             rwLock.writeLock().lock();
 
             try {
-                
notifyWatchProcessorEventsBeforeStartingWatches.addAll(fromStorage);
+                
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateEntriesEvents);
+                // Adds events for which there were no entries updates but the 
revision was updated.
+                
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateOnlyRevisionEvents);
 
                 drainNotifyWatchProcessorEventsBeforeStartingWatches();
 
@@ -1144,10 +1148,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      * Adds modified entries to the watch event queue.
      */
     private void queueWatchEvent() {
-        if (updatedEntries.isEmpty()) {
-            return;
-        }
-
         switch (recoveryStatus.get()) {
             case INITIAL:
                 // Watches haven't been enabled yet, no need to queue any 
events, they will be replayed upon recovery.
@@ -1155,28 +1155,17 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
                 break;
             case IN_PROGRESS:
-                UpdatedEntries copy = updatedEntries.transfer();
-
-                var event = new UpdateEntriesEvent(copy.updatedEntries, 
copy.ts);
-
-                addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
+                
addToNotifyWatchProcessorEventsBeforeStartingWatches(updatedEntries.toNotifyWatchProcessorEvent(rev));
 
                 break;
             default:
-                notifyWatches();
+                
updatedEntries.toNotifyWatchProcessorEvent(rev).notify(watchProcessor);
 
                 break;
         }
     }
 
-    private void notifyWatches() {
-        UpdatedEntries copy = updatedEntries.transfer();
-
-        assert copy.ts != null;
-        watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
-    }
-
-    private Set<NotifyWatchProcessorEvent> 
collectNotifyWatchProcessorEventsFromStorage(long lowerRevision, long 
upperRevision) {
+    private Set<UpdateEntriesEvent> collectUpdateEntriesEventsFromStorage(long 
lowerRevision, long upperRevision) {
         long minWatchRevision = Math.max(lowerRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
         if (minWatchRevision > upperRevision) {
@@ -1186,7 +1175,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         var updatedEntries = new ArrayList<Entry>();
         HybridTimestamp ts = null;
 
-        var events = new TreeSet<NotifyWatchProcessorEvent>();
+        var events = new TreeSet<UpdateEntriesEvent>();
 
         try (
                 var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1252,6 +1241,40 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         return events;
     }
 
+    private Set<UpdateOnlyRevisionEvent> 
collectUpdateRevisionEventsFromStorage(long lowerRevision, long upperRevision) {
+        var events = new TreeSet<UpdateOnlyRevisionEvent>();
+
+        try (
+                var upperBound = new Slice(longToBytes(upperRevision + 1));
+                var options = new 
ReadOptions().setIterateUpperBound(upperBound);
+                RocksIterator it = revisionToTs.newIterator(options)
+        ) {
+            it.seek(longToBytes(lowerRevision));
+
+            for (; it.isValid(); it.next()) {
+                byte[] rocksKey = it.key();
+                byte[] rocksValue = it.value();
+
+                long revision = bytesToLong(rocksKey);
+                HybridTimestamp time = 
hybridTimestamp(bytesToLong(rocksValue));
+
+                UpdateOnlyRevisionEvent event = new 
UpdateOnlyRevisionEvent(revision, time);
+
+                boolean added = events.add(event);
+
+                assert added : event;
+
+                try {
+                    it.status();
+                } catch (RocksDBException e) {
+                    throw new MetaStorageException(OP_EXECUTION_ERR, e);
+                }
+            }
+        }
+
+        return events;
+    }
+
     @Override
     public HybridTimestamp timestampByRevision(long revision) {
         rwLock.readLock().lock();
@@ -1340,6 +1363,13 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             return transferredValue;
         }
+
+        NotifyWatchProcessorEvent toNotifyWatchProcessorEvent(long 
newRevision) {
+            UpdatedEntries copy = transfer();
+
+            return copy.updatedEntries.isEmpty() ? new 
UpdateOnlyRevisionEvent(newRevision, copy.ts)
+                    : new UpdateEntriesEvent(copy.updatedEntries, copy.ts);
+        }
     }
 
     @Override
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 898027b10e..aaef591009 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -30,8 +30,10 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -79,6 +81,8 @@ import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * Tests for key-value storage implementations.
@@ -2195,6 +2199,48 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertThrows(CompactedException.class, () -> 
storage.revisionByTimestamp(hybridTimestamp(2)));
     }
 
+    @ParameterizedTest
+    @EnumSource(UpdateRevisionOperation.class)
+    void 
testNotifyUpdateRevisionForOperationAfterStartWatches(UpdateRevisionOperation 
updateRevisionOperation) {
+        var revisionUpdateListener = new TestRevisionUpdateListener();
+        storage.registerRevisionUpdateListener(revisionUpdateListener);
+
+        var watchEventHandlingCallback = new TestWatchEventHandlingCallback();
+        storage.startWatches(1, watchEventHandlingCallback);
+
+        storage.put(key(0), keyValue(0, 1), kvContext(hybridTimestamp(10)));
+
+        long revision = storage.revision();
+        long newRevision = revision + 1;
+
+        updateRevisionOperation.execute(storage);
+
+        assertThat(storage.revision(), equalTo(newRevision));
+        assertThat(revisionUpdateListener.get(newRevision), willSucceedFast());
+        assertThat(watchEventHandlingCallback.get(newRevision), 
willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @EnumSource(UpdateRevisionOperation.class)
+    void 
testNotifyUpdateRevisionForOperationBeforeStartWatches(UpdateRevisionOperation 
updateRevisionOperation) {
+        var revisionUpdateListener = new TestRevisionUpdateListener();
+        storage.registerRevisionUpdateListener(revisionUpdateListener);
+
+        storage.put(key(0), keyValue(0, 1), kvContext(hybridTimestamp(10)));
+
+        long revision = storage.revision();
+        long newRevision = revision + 1;
+
+        updateRevisionOperation.execute(storage);
+
+        var watchEventHandlingCallback = new TestWatchEventHandlingCallback();
+        storage.startWatches(1, watchEventHandlingCallback);
+
+        assertThat(storage.revision(), equalTo(newRevision));
+        assertThat(revisionUpdateListener.get(newRevision), willSucceedFast());
+        assertThat(watchEventHandlingCallback.get(newRevision), 
willSucceedFast());
+    }
+
     private CompletableFuture<Void> watchExact(
             byte[] key, long revision, int expectedNumCalls, 
BiConsumer<WatchEvent, Integer> testCondition
     ) {
@@ -2246,17 +2292,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches(1, new WatchEventHandlingCallback() {
-            @Override
-            public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
-                // No-op.
-            }
-
-            @Override
-            public void onRevisionApplied(long revision) {
-                // No-op.
-            }
-        });
+        storage.startWatches(1, new WatchEventHandlingCallback() {});
 
         return resultFuture;
     }
@@ -2315,7 +2351,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
         assertEquals(List.of(timestamps), 
collectTimestamps(storage.getAll(keys, revUpperBound)));
     }
 
-    private static CommandId createCommandId() {
+    static CommandId createCommandId() {
         return new CommandIdGenerator(UUID::randomUUID).newId();
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
similarity index 51%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
copy to 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
index 36066d9a37..78e7683b68 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
@@ -17,27 +17,24 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.Collection;
-import org.apache.ignite.internal.metastorage.Entry;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-/**
- * Defines interface for boolean condition which could be applied to an array 
of entries.
- *
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
- */
-public interface Condition {
-    /**
-     * Returns the keys which identifies an entries which condition will be 
applied to.
-     *
-     * @return The keys which identifies an entries which condition will be 
applied to.
-     */
-    byte[][] keys();
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+
+class TestRevisionUpdateListener implements RevisionUpdateListener {
+    private final Map<Long, CompletableFuture<Void>> futureByRevision = new 
ConcurrentHashMap<>();
+
+    @Override
+    public CompletableFuture<?> onUpdated(long revision) {
+        get(revision).complete(null);
+
+        return nullCompletedFuture();
+    }
 
-    /**
-     * Tests the given entries on condition.
-     *
-     * @param entries Array of entries which will be tested on the condition. 
Can't be {@code null}.
-     * @return {@code True} if the given entries satisfies to the condition, 
otherwise - {@code false}.
-     */
-    boolean test(Entry... entries);
+    CompletableFuture<Void> get(long revision) {
+        return futureByRevision.computeIfAbsent(revision, revision0 -> new 
CompletableFuture<>());
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
similarity index 51%
copy from 
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
copy to 
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
index 36066d9a37..9ac22073cd 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
@@ -17,27 +17,19 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.Collection;
-import org.apache.ignite.internal.metastorage.Entry;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
-/**
- * Defines interface for boolean condition which could be applied to an array 
of entries.
- *
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
- */
-public interface Condition {
-    /**
-     * Returns the keys which identifies an entries which condition will be 
applied to.
-     *
-     * @return The keys which identifies an entries which condition will be 
applied to.
-     */
-    byte[][] keys();
+class TestWatchEventHandlingCallback implements WatchEventHandlingCallback {
+    private final Map<Long, CompletableFuture<Void>> futureByRevision = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void onRevisionApplied(long revision) {
+        get(revision).complete(null);
+    }
 
-    /**
-     * Tests the given entries on condition.
-     *
-     * @param entries Array of entries which will be tested on the condition. 
Can't be {@code null}.
-     * @return {@code True} if the given entries satisfies to the condition, 
otherwise - {@code false}.
-     */
-    boolean test(Entry... entries);
+    CompletableFuture<Void> get(long revision) {
+        return futureByRevision.computeIfAbsent(revision, revision0 -> new 
CompletableFuture<>());
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
new file mode 100644
index 0000000000..3b3402e29c
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static 
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest.key;
+import static 
org.apache.ignite.internal.metastorage.server.BasicOperationsKeyValueStorageTest.createCommandId;
+import static 
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
+import static org.apache.ignite.internal.util.ArrayUtils.INT_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+
+/**
+ * Enumeration for testing storage revision change notification. It is 
expected that there is already an {@link Entry} with key {@code 0}
+ * in the storage.
+ */
+enum UpdateRevisionOperation {
+    // Simple operations.
+    PUT_NEW(storage -> put(storage, 1)),
+    PUT_ALL_NEW(storage -> putAll(storage, 1)),
+    PUT_EXISTING(storage -> put(storage, 0)),
+    PUT_ALL_EXISTING(storage -> putAll(storage, 0)),
+    REMOVE_EXISTING(storage -> remove(storage, 0)),
+    REMOVE_ALL_EXISTING(storage -> removeAll(storage, 0)),
+    REMOVE_NOT_EXISTING(storage -> remove(storage, 1)),
+    REMOVE_ALL_NOT_EXISTING(storage -> removeAll(storage, 1)),
+    PUT_ALL_EMPTY(storage -> putAll(storage, INT_EMPTY_ARRAY)),
+    REMOVE_ALL_EMPTY(storage -> removeAll(storage, INT_EMPTY_ARRAY)),
+    // Invoke operations.
+    INVOKE_PUT_NEW(storage -> invokeSuccessCondition(storage, 
putOperation(1))),
+    INVOKE_IF_PUT_NEW(storage -> invokeIfSuccessCondition(storage, 
putOperation(1))),
+    INVOKE_PUT_EXISTING(storage -> invokeSuccessCondition(storage, 
putOperation(0))),
+    INVOKE_IF_PUT_EXISTING(storage -> invokeIfSuccessCondition(storage, 
putOperation(0))),
+    INVOKE_REMOVE_EXISTING(storage -> invokeSuccessCondition(storage, 
removeOperation(0))),
+    INVOKE_IF_REMOVE_EXISTING(storage -> invokeIfSuccessCondition(storage, 
removeOperation(0))),
+    INVOKE_REMOVE_NOT_EXISTING(storage -> invokeSuccessCondition(storage, 
removeOperation(1))),
+    INVOKE_IF_REMOVE_NOT_EXISTING(storage -> invokeIfSuccessCondition(storage, 
removeOperation(1))),
+    INVOKE_NOOP(storage -> invokeSuccessCondition(storage, noop())),
+    INVOKE_IF_NOOP(storage -> invokeIfSuccessCondition(storage, noop()));
+
+    private final Consumer<KeyValueStorage> function;
+
+    UpdateRevisionOperation(Consumer<KeyValueStorage> function) {
+        this.function = function;
+    }
+
+    void execute(KeyValueStorage storage) {
+        Entry entry = storage.get(key(0));
+
+        assertFalse(entry.empty());
+        assertFalse(entry.tombstone());
+
+        function.accept(storage);
+    }
+
+    private static void put(KeyValueStorage storage, int key) {
+        storage.put(key(key), value(1), randomKvContext());
+    }
+
+    private static void putAll(KeyValueStorage storage, int... keys) {
+        storage.putAll(toListKeyByteArray(keys), toListValueByteArray(keys), 
randomKvContext());
+    }
+
+    private static void remove(KeyValueStorage storage, int key) {
+        storage.remove(key(key), randomKvContext());
+    }
+
+    private static void removeAll(KeyValueStorage storage, int... keys) {
+        storage.removeAll(toListKeyByteArray(keys), randomKvContext());
+    }
+
+    private static void invokeSuccessCondition(KeyValueStorage storage, 
Operation operation) {
+        storage.invoke(
+                new ExistenceCondition(Type.EXISTS, key(0)),
+                List.of(operation),
+                List.of(noop()),
+                randomKvContext(),
+                createCommandId()
+        );
+    }
+
+    private static void invokeIfSuccessCondition(KeyValueStorage storage, 
Operation operation) {
+        storage.invoke(
+                new If(
+                        new ExistenceCondition(Type.EXISTS, key(0)),
+                        new Statement(ops(operation).yield()),
+                        new Statement(ops(noop()).yield())
+                ),
+                randomKvContext(),
+                createCommandId()
+        );
+    }
+
+    private static KeyValueUpdateContext randomKvContext() {
+        return kvContext(hybridTimestamp(100_500));
+    }
+
+    private static byte[] value(int value) {
+        return ByteArray.fromString("value=" + value).bytes();
+    }
+
+    private static Operation putOperation(int key) {
+        return Operations.put(keyByteArray(key), value(1));
+    }
+
+    private static Operation removeOperation(int key) {
+        return Operations.remove(keyByteArray(key));
+    }
+
+    private static ByteArray keyByteArray(int key) {
+        return new ByteArray(key(key));
+    }
+
+    private static List<byte[]> toListKeyByteArray(int... keys) {
+        return 
IntStream.of(keys).mapToObj(AbstractKeyValueStorageTest::key).collect(toList());
+    }
+
+    private static List<byte[]> toListValueByteArray(int... keys) {
+        return 
IntStream.of(keys).mapToObj(UpdateRevisionOperation::value).collect(toList());
+    }
+}
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4d7e306981..b6e7cfa2bb 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -487,6 +487,14 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
 
                     fillNotifyWatchProcessorEventsFromUpdatedEntries();
                 });
+
+        for (long revision = startRevision; revision <= rev; revision++) {
+            HybridTimestamp time = revToTsMap.get(revision);
+
+            assert time != null : revision;
+
+            notifyWatchProcessorEventsBeforeStartingWatches.add(new 
UpdateOnlyRevisionEvent(revision, time));
+        }
     }
 
     private void fillNotifyWatchProcessorEventsFromUpdatedEntries() {
@@ -507,20 +515,24 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     private void notifyWatches() {
-        if (!areWatchesStarted() || updatedEntries.isEmpty()) {
+        if (!areWatchesStarted()) {
             updatedEntries.clear();
 
             return;
         }
 
-        long revision = updatedEntries.get(0).revision();
+        long newRevision = rev;
 
-        HybridTimestamp ts = revToTsMap.get(revision);
-        assert ts != null : revision;
+        HybridTimestamp ts = revToTsMap.get(newRevision);
+        assert ts != null : newRevision;
 
-        watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
+        if (updatedEntries.isEmpty()) {
+            watchProcessor.updateOnlyRevision(newRevision, ts);
+        } else {
+            watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
 
-        updatedEntries.clear();
+            updatedEntries.clear();
+        }
     }
 
     @Override
@@ -731,7 +743,6 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
         var updatedEntry = new EntryImpl(key, val.tombstone() ? null : bytes, 
curRev, val.operationTimestamp());
 
         updatedEntries.add(updatedEntry);
-
     }
 
     private void doPutAll(long curRev, List<byte[]> keys, List<byte[]> 
bytesList, KeyValueUpdateContext context) {

Reply via email to