This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ef8a3c5da IGNITE-20541 Improve empty event handling in Watch 
Processor (#2652)
0ef8a3c5da is described below

commit 0ef8a3c5dae9f8aec768297c431aa548ab56f1ad
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Oct 5 09:58:55 2023 +0300

    IGNITE-20541 Improve empty event handling in Watch Processor (#2652)
---
 .../ignite/internal/logger/IgniteLogger.java       |  35 ++++----
 .../metastorage/server/WatchAndEvents.java         |   2 +
 .../metastorage/server/WatchProcessor.java         | 100 ++++++++++++---------
 .../metastorage/server/WatchProcessorTest.java     |  63 +++++++++----
 .../impl/StandaloneMetaStorageManager.java         |  11 ++-
 5 files changed, 129 insertions(+), 82 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/logger/IgniteLogger.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/logger/IgniteLogger.java
index dbfa894796..ec5557e92c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/logger/IgniteLogger.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/logger/IgniteLogger.java
@@ -21,6 +21,7 @@ import java.lang.System.Logger.Level;
 import java.util.Objects;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite logger wraps system logger for more convenient access.
@@ -55,7 +56,7 @@ public class IgniteLogger {
      * @param th     The {@code Throwable} associated with log message; can be 
{@code null}.
      * @param params The list of arguments to be substituted in place of 
formatting anchors.
      */
-    public void info(String msg, Throwable th, Object... params) {
+    public void info(String msg, @Nullable Throwable th, Object... params) {
         logInternal(Level.INFO, msg, th, params);
     }
 
@@ -65,7 +66,7 @@ public class IgniteLogger {
      * @param msgSupplier A supplier function that produces a message.
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      */
-    public void info(Supplier<String> msgSupplier, Throwable th) {
+    public void info(Supplier<String> msgSupplier, @Nullable Throwable th) {
         logInternalExceptional(Level.INFO, msgSupplier, th);
     }
 
@@ -75,7 +76,7 @@ public class IgniteLogger {
      * @param msg The message pattern which will be passed to the {@link 
System.Logger}.
      * @param th  The {@code Throwable} associated with the log message.
      */
-    public void info(String msg, Throwable th) {
+    public void info(String msg, @Nullable Throwable th) {
         delegate.log(Level.INFO, msg, th);
     }
 
@@ -96,7 +97,7 @@ public class IgniteLogger {
      * @param th     The {@code Throwable} associated with log message; can be 
{@code null}.
      * @param params The list of arguments to be substituted in place of 
formatting anchors.
      */
-    public void debug(String msg, Throwable th, Object... params) {
+    public void debug(String msg, @Nullable Throwable th, Object... params) {
         logInternal(Level.DEBUG, msg, th, params);
     }
 
@@ -106,7 +107,7 @@ public class IgniteLogger {
      * @param msgSupplier A supplier function that produces a message.
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      */
-    public void debug(Supplier<String> msgSupplier, Throwable th) {
+    public void debug(Supplier<String> msgSupplier, @Nullable Throwable th) {
         logInternalExceptional(Level.DEBUG, msgSupplier, th);
     }
 
@@ -116,7 +117,7 @@ public class IgniteLogger {
      * @param msg The message pattern which will be passed to the {@link 
System.Logger}.
      * @param th  The {@code Throwable} associated with the log message;
      */
-    public void debug(String msg, Throwable th) {
+    public void debug(String msg, @Nullable Throwable th) {
         delegate.log(Level.DEBUG, msg, th);
     }
 
@@ -137,7 +138,7 @@ public class IgniteLogger {
      * @param th     The {@code Throwable} associated with log message; can be 
{@code null}.
      * @param params The list of arguments to be substituted in place of 
formatting anchors.
      */
-    public void warn(String msg, Throwable th, Object... params) {
+    public void warn(String msg, @Nullable Throwable th, Object... params) {
         logInternal(Level.WARNING, msg, th, params);
     }
 
@@ -147,7 +148,7 @@ public class IgniteLogger {
      * @param msgSupplier A supplier function that produces a message.
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      */
-    public void warn(Supplier<String> msgSupplier, Throwable th) {
+    public void warn(Supplier<String> msgSupplier, @Nullable Throwable th) {
         logInternalExceptional(Level.WARNING, msgSupplier, th);
     }
 
@@ -157,7 +158,7 @@ public class IgniteLogger {
      * @param msg The message pattern which will be passed to the {@link 
System.Logger}.
      * @param th  The {@code Throwable} associated with the log message.
      */
-    public void warn(String msg, Throwable th) {
+    public void warn(String msg, @Nullable Throwable th) {
         delegate.log(Level.WARNING, msg, th);
     }
 
@@ -178,7 +179,7 @@ public class IgniteLogger {
      * @param th     The {@code Throwable} associated with log message; can be 
{@code null}.
      * @param params The list of arguments to be substituted in place of 
formatting anchors.
      */
-    public void error(String msg, Throwable th, Object... params) {
+    public void error(String msg, @Nullable Throwable th, Object... params) {
         logInternal(Level.ERROR, msg, th, params);
     }
 
@@ -188,7 +189,7 @@ public class IgniteLogger {
      * @param msgSupplier A supplier function that produces a message.
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      */
-    public void error(Supplier<String> msgSupplier, Throwable th) {
+    public void error(Supplier<String> msgSupplier, @Nullable Throwable th) {
         logInternalExceptional(Level.ERROR, msgSupplier, th);
     }
 
@@ -198,7 +199,7 @@ public class IgniteLogger {
      * @param msg The message pattern which will be passed to the {@link 
System.Logger}.
      * @param th  The {@code Throwable} associated with the log message.
      */
-    public void error(String msg, Throwable th) {
+    public void error(String msg, @Nullable Throwable th) {
         delegate.log(Level.ERROR, msg, th);
     }
 
@@ -219,7 +220,7 @@ public class IgniteLogger {
      * @param th     The {@code Throwable} associated with log message; can be 
{@code null}.
      * @param params The list of arguments to be substituted in place of 
formatting anchors.
      */
-    public void trace(String msg, Throwable th, Object... params) {
+    public void trace(String msg, @Nullable Throwable th, Object... params) {
         logInternal(Level.TRACE, msg, th, params);
     }
 
@@ -229,7 +230,7 @@ public class IgniteLogger {
      * @param msgSupplier A supplier function that produces a message.
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      */
-    public void trace(Supplier<String> msgSupplier, Throwable th) {
+    public void trace(Supplier<String> msgSupplier, @Nullable Throwable th) {
         logInternalExceptional(Level.TRACE, msgSupplier, th);
     }
 
@@ -239,7 +240,7 @@ public class IgniteLogger {
      * @param msg The message pattern which will be passed to the {@link 
System.Logger}.
      * @param th  A {@code Throwable} associated with the log message.
      */
-    public void trace(String msg, Throwable th) {
+    public void trace(String msg, @Nullable Throwable th) {
         delegate.log(Level.TRACE, msg, th);
     }
 
@@ -252,7 +253,7 @@ public class IgniteLogger {
      * @param params An optional list of parameters to the message (may be 
none).
      * @throws NullPointerException If {@code level} is {@code null}.
      */
-    private void logInternal(Level level, String msg, Throwable th, Object... 
params) {
+    private void logInternal(Level level, String msg, @Nullable Throwable th, 
Object... params) {
         Objects.requireNonNull(level);
 
         if (!delegate.isLoggable(level)) {
@@ -274,7 +275,7 @@ public class IgniteLogger {
      * @param th          The {@code Throwable} associated with log message; 
can be {@code null}.
      * @throws NullPointerException If {@code level} is {@code null}, or 
{@code msgSupplier} is {@code null}.
      */
-    private void logInternalExceptional(Level level, Supplier<String> 
msgSupplier, Throwable th) {
+    private void logInternalExceptional(Level level, Supplier<String> 
msgSupplier, @Nullable Throwable th) {
         Objects.requireNonNull(level);
         Objects.requireNonNull(msgSupplier);
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchAndEvents.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchAndEvents.java
index e23ffb2d3c..a03533c8ff 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchAndEvents.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchAndEvents.java
@@ -26,6 +26,8 @@ class WatchAndEvents {
     final List<EntryEvent> events;
 
     WatchAndEvents(Watch watch, List<EntryEvent> events) {
+        assert !events.isEmpty();
+
         this.watch = watch;
         this.events = events;
     }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 5a58278f48..95f0568b83 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -48,8 +48,8 @@ import org.apache.ignite.internal.util.IgniteUtils;
  * Class for storing and notifying Meta Storage Watches.
  *
  * <p>Every Meta Storage update is processed by each registered Watch in 
parallel, however notifications for a single Watch are
- * linearised (Watches are always notified of one event at a time and in 
increasing order of revisions). It is also guaranteed that
- * Watches will not get notified of a new revision until all Watches have 
finished processing a previous revision.
+ * linearised (Watches are always notified of one event at a time and in 
increasing order of revisions). It is also guaranteed that Watches
+ * will not get notified of a new revision until all Watches have finished 
processing a previous revision.
  */
 public class WatchProcessor implements ManuallyCloseable {
     /** Reads an entry from the storage using a given key and revision. */
@@ -122,12 +122,24 @@ public class WatchProcessor implements ManuallyCloseable {
     }
 
     /**
-     * Notifies registered watches about an update event.
+     * Queues the following set of actions that will be executed after the 
previous invocation of this method completes:
+     *
+     * <ol>
+     *     <li>Notifies all registered watches about the changed entries;</li>
+     *     <li>Notifies all registered revision listeners about the new 
revision;</li>
+     *     <li>After all above notifications are processed, notifies about the 
Safe Time update.</li>
+     * </ol>
+     *
+     * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.
+     *
+     * @param updatedEntries Entries that were changed during a Meta Storage 
update.
+     * @param time Timestamp of the Meta Storage update.
+     * @return Future that gets completed when all registered watches have 
been notified of the given event.
      */
-    public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp 
time) {
+    public CompletableFuture<Void> notifyWatches(List<Entry> updatedEntries, 
HybridTimestamp time) {
         assert time != null;
 
-        notificationFuture = notificationFuture
+        CompletableFuture<Void> newFuture = notificationFuture
                 .thenComposeAsync(v -> {
                     // Revision must be the same for all entries.
                     long newRevision = updatedEntries.get(0).revision();
@@ -142,14 +154,18 @@ public class WatchProcessor implements ManuallyCloseable {
                                 // Revision update is triggered strictly after 
all watch listeners have been notified.
                                 CompletableFuture<Void> 
notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision);
 
-                                return allOf(notifyWatchesFuture, 
notifyUpdateRevisionFuture);
-                            }, watchExecutor)
-                            .thenComposeAsync(ignored ->
-                                    
invokeOnRevisionCallback(watchesAndEventsFuture, newRevision, time), 
watchExecutor
-                            );
+                                return allOf(notifyWatchesFuture, 
notifyUpdateRevisionFuture)
+                                        .thenComposeAsync(
+                                                unused -> 
invokeOnRevisionCallback(watchAndEvents, newRevision, time),
+                                                watchExecutor
+                                        );
+                            }, watchExecutor);
                 }, watchExecutor);
-    }
 
+        notificationFuture = newFuture;
+
+        return newFuture;
+    }
 
     private static CompletableFuture<Void> notifyWatches(List<WatchAndEvents> 
watchAndEventsList, long revision, HybridTimestamp time) {
         if (watchAndEventsList.isEmpty()) {
@@ -158,44 +174,46 @@ public class WatchProcessor implements ManuallyCloseable {
 
         CompletableFuture<?>[] notifyWatchFutures = new 
CompletableFuture[watchAndEventsList.size()];
 
-        int i = 0;
+        for (int i = 0; i < watchAndEventsList.size(); i++) {
+            WatchAndEvents watchAndEvents = watchAndEventsList.get(i);
 
-        for (WatchAndEvents watchAndEvents : watchAndEventsList) {
             CompletableFuture<Void> notifyWatchFuture;
 
             try {
-                if (watchAndEvents.events.isEmpty()) {
-                    notifyWatchFuture = completedFuture(null);
-                } else {
-                    notifyWatchFuture = watchAndEvents.watch.onUpdate(new 
WatchEvent(watchAndEvents.events, revision, time))
-                            .whenComplete((v, e) -> {
-                                if (e != null) {
-                                    if (e instanceof CompletionException) {
-                                        e = e.getCause();
-                                    }
-
-                                    // TODO: IGNITE-14693 Implement Meta 
storage exception handling
-                                    LOG.error("Error occurred when processing 
a watch event", e);
-
-                                    watchAndEvents.watch.onError(e);
+                var event = new WatchEvent(watchAndEvents.events, revision, 
time);
+
+                notifyWatchFuture = watchAndEvents.watch.onUpdate(event)
+                        .whenComplete((v, e) -> {
+                            if (e != null) {
+                                if (e instanceof CompletionException) {
+                                    e = e.getCause();
                                 }
-                            });
-                }
+
+                                // TODO: IGNITE-14693 Implement Meta storage 
exception handling
+                                LOG.error("Error occurred when processing a 
watch event", e);
+
+                                watchAndEvents.watch.onError(e);
+                            }
+                        });
             } catch (Throwable throwable) {
                 watchAndEvents.watch.onError(throwable);
 
                 notifyWatchFuture = failedFuture(throwable);
             }
 
-            notifyWatchFutures[i++] = notifyWatchFuture;
+            notifyWatchFutures[i] = notifyWatchFuture;
         }
 
         return allOf(notifyWatchFutures);
     }
 
     private CompletableFuture<List<WatchAndEvents>> 
collectWatchesAndEvents(List<Entry> updatedEntries, long revision) {
+        if (watches.isEmpty()) {
+            return completedFuture(List.of());
+        }
+
         return supplyAsync(() -> {
-            List<WatchAndEvents> watchAndEvents = List.of();
+            var watchAndEvents = new ArrayList<WatchAndEvents>();
 
             for (Watch watch : watches) {
                 List<EntryEvent> events = List.of();
@@ -216,29 +234,21 @@ public class WatchProcessor implements ManuallyCloseable {
                     }
                 }
 
-                if (watchAndEvents.isEmpty()) {
-                    watchAndEvents = new ArrayList<>();
+                if (!events.isEmpty()) {
+                    watchAndEvents.add(new WatchAndEvents(watch, events));
                 }
-
-                watchAndEvents.add(new WatchAndEvents(watch, events));
             }
 
             return watchAndEvents;
         }, watchExecutor);
     }
 
-    private CompletableFuture<Void> invokeOnRevisionCallback(
-            CompletableFuture<List<WatchAndEvents>> watchAndEventsFuture,
-            long revision,
-            HybridTimestamp time
-    ) {
+    private CompletableFuture<Void> 
invokeOnRevisionCallback(List<WatchAndEvents> watchAndEventsList, long 
revision, HybridTimestamp time) {
         try {
             // Only notify about entries that have been accepted by at least 
one Watch.
             var acceptedEntries = new HashSet<EntryEvent>();
 
-            assert watchAndEventsFuture.isDone();
-
-            for (WatchAndEvents watchAndEvents : watchAndEventsFuture.join()) {
+            for (WatchAndEvents watchAndEvents : watchAndEventsList) {
                 acceptedEntries.addAll(watchAndEvents.events);
             }
 
@@ -255,12 +265,14 @@ public class WatchProcessor implements ManuallyCloseable {
         } catch (Throwable e) {
             LOG.error("Error occurred when notifying watches", e);
 
-            throw e;
+            return failedFuture(e);
         }
     }
 
     /**
      * Advances safe time without notifying watches (as there is no new 
revision).
+     *
+     * <p>This method is not thread-safe and must be performed under an 
exclusive lock in concurrent scenarios.
      */
     public void advanceSafeTime(HybridTimestamp time) {
         assert time != null;
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 745e0f2143..3541be44af 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
@@ -80,17 +82,19 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
+        CompletableFuture<Void> notificationFuture = 
watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
+
+        assertThat(notificationFuture, willCompleteSuccessfully());
 
         var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
         var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
 
-        verify(listener1, timeout(1_000)).onUpdate(new 
WatchEvent(entryEvent1));
-        verify(listener2, timeout(1_000)).onUpdate(new 
WatchEvent(entryEvent2));
+        verify(listener1).onUpdate(new WatchEvent(entryEvent1));
+        verify(listener2).onUpdate(new WatchEvent(entryEvent2));
 
         var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
 
-        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+        verify(revisionCallback).onRevisionApplied(watchEventCaptor.capture());
 
         WatchEvent event = watchEventCaptor.getValue();
 
@@ -114,23 +118,27 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
 
         HybridTimestamp ts = new HybridTimestamp(1, 2);
 
-        watchProcessor.notifyWatches(List.of(entry1), ts);
+        CompletableFuture<Void> notificationFuture = 
watchProcessor.notifyWatches(List.of(entry1), ts);
+
+        assertThat(notificationFuture, willCompleteSuccessfully());
 
         var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
 
-        verify(listener1, timeout(1_000)).onUpdate(event);
+        verify(listener1).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback).onRevisionApplied(event);
 
         ts = new HybridTimestamp(2, 3);
 
-        watchProcessor.notifyWatches(List.of(entry2), ts);
+        notificationFuture = watchProcessor.notifyWatches(List.of(entry2), ts);
+
+        assertThat(notificationFuture, willCompleteSuccessfully());
 
         event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
 
-        verify(listener2, timeout(1_000)).onUpdate(event);
+        verify(listener2).onUpdate(event);
 
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+        verify(revisionCallback).onRevisionApplied(event);
     }
 
     /**
@@ -150,11 +158,13 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
         var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
         var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
 
-        watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
+        CompletableFuture<Void> notificationFuture = 
watchProcessor.notifyWatches(List.of(entry1, entry2), 
HybridTimestamp.MAX_VALUE);
 
-        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
-        verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
-        verify(listener2, 
timeout(1_000)).onError(any(IllegalStateException.class));
+        assertThat(notificationFuture, willThrow(IllegalStateException.class));
+
+        verify(listener1).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
+        verify(listener2).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
+        verify(listener2).onError(any(IllegalStateException.class));
 
         verify(revisionCallback, never()).onRevisionApplied(any());
     }
@@ -190,19 +200,36 @@ public class WatchProcessorTest extends 
BaseIgniteAbstractTest {
         var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0);
         var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
 
-        watchProcessor.notifyWatches(List.of(entry3, entry4), 
HybridTimestamp.MAX_VALUE);
+        CompletableFuture<Void> notificationFuture = 
watchProcessor.notifyWatches(List.of(entry3, entry4), 
HybridTimestamp.MAX_VALUE);
 
         verify(listener1, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
         verify(listener2, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
 
         blockingFuture.complete(null);
 
-        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
+        assertThat(notificationFuture, willCompleteSuccessfully());
+
+        verify(listener1).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
 
         InOrder inOrder = inOrder(listener2);
 
-        inOrder.verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
-        inOrder.verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
+        inOrder.verify(listener2).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
+        inOrder.verify(listener2).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
+    }
+
+    @Test
+    void testEmptyEvents() {
+        WatchListener listener = mockListener();
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
+
+        CompletableFuture<Void> notificationFuture = 
watchProcessor.notifyWatches(List.of(entry), HybridTimestamp.MAX_VALUE);
+
+        assertThat(notificationFuture, willCompleteSuccessfully());
+
+        verify(listener, never()).onUpdate(any());
     }
 
     private static WatchListener mockListener() {
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 3c0ec6a05b..084598c3cc 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
 import java.io.Serializable;
 import java.util.Set;
@@ -48,6 +49,8 @@ import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 import org.mockito.ArgumentCaptor;
+import org.mockito.MockSettings;
+import org.mockito.quality.Strictness;
 
 /**
  * MetaStorageManager dummy implementation.
@@ -60,6 +63,8 @@ import org.mockito.ArgumentCaptor;
 public class StandaloneMetaStorageManager extends MetaStorageManagerImpl {
     private static final String TEST_NODE_NAME = "standalone-ms-node";
 
+    private static final MockSettings LENIENT_SETTINGS = 
withSettings().strictness(Strictness.LENIENT);
+
     /**
      * Creates standalone MetaStorage manager for provided VaultManager.
      */
@@ -137,7 +142,7 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
 
     private static RaftManager mockRaftManager() {
         ArgumentCaptor<RaftGroupListener> listenerCaptor = 
ArgumentCaptor.forClass(RaftGroupListener.class);
-        RaftManager raftManager = mock(RaftManager.class);
+        RaftManager raftManager = mock(RaftManager.class, LENIENT_SETTINGS);
         TopologyAwareRaftGroupService raftGroupService = 
mock(TopologyAwareRaftGroupService.class);
 
         try {
@@ -176,8 +181,8 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
     }
 
     private static MetaStorageConfiguration mockConfiguration() {
-        MetaStorageConfiguration configuration = 
mock(MetaStorageConfiguration.class);
-        ConfigurationValue<Long> value = mock(ConfigurationValue.class);
+        MetaStorageConfiguration configuration = 
mock(MetaStorageConfiguration.class, LENIENT_SETTINGS);
+        ConfigurationValue<Long> value = mock(ConfigurationValue.class, 
LENIENT_SETTINGS);
 
         when(configuration.idleSyncTimeInterval()).thenReturn(value);
         when(value.value()).thenReturn(1000L);

Reply via email to