This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6936b4c978 IGNITE-19129 Remove parallel Watch processing (#1863)
6936b4c978 is described below

commit 6936b4c978525391ba9928c42c7cf5fa2ddba606
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Mar 30 15:32:12 2023 +0300

    IGNITE-19129 Remove parallel Watch processing (#1863)
---
 .../internal/catalog/CatalogServiceImpl.java       |   8 --
 .../internal/testframework/IgniteTestUtils.java    | 135 +++++++++------------
 .../distributionzones/DistributionZoneManager.java |  11 +-
 ...butionZoneManagerLogicalTopologyEventsTest.java |   2 -
 .../DistributionZoneManagerScaleUpTest.java        |   2 +-
 .../DistributionZoneManagerWatchListenerTest.java  |   2 +-
 .../internal/metastorage/MetaStorageManager.java   |   6 +-
 .../ignite/internal/metastorage/WatchEvent.java    |   6 +-
 .../ignite/internal/metastorage/WatchListener.java |   6 -
 .../impl/ItMetaStorageManagerImplTest.java         |  24 +---
 .../impl/ItMetaStorageMultipleNodesTest.java       |   5 -
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  25 ----
 .../metastorage/impl/MetaStorageManagerImpl.java   |  53 +++++---
 .../server/OnRevisionAppliedCallback.java          |   7 +-
 .../ignite/internal/metastorage/server/Watch.java  |  54 ++-------
 .../metastorage/server/WatchProcessor.java         | 132 +++++++++++++-------
 .../server/AbstractKeyValueStorageTest.java        |  23 +---
 .../server/RocksDbKeyValueStorageTest.java         |  12 +-
 .../metastorage/server/WatchProcessorTest.java     |  65 +++++-----
 .../placementdriver/AssignmentsTracker.java        |   5 -
 .../internal/placementdriver/LeaseTracker.java     |   5 -
 .../ItDistributedConfigurationStorageTest.java     |   2 +-
 .../internal/test/WatchListenerInhibitor.java      |  26 ++--
 .../storage/DistributedConfigurationStorage.java   |  16 +--
 .../DistributedConfigurationCatchUpTest.java       |  11 +-
 .../internal/table/distributed/TableManager.java   |  20 ---
 26 files changed, 275 insertions(+), 388 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 2d9ce8c768..e3a5472a4e 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -152,19 +152,11 @@ public class CatalogServiceImpl implements 
CatalogService, CatalogManager {
      * MetaStorage event listener for catalog metadata updates.
      */
     private static class CatalogEventListener implements WatchListener {
-        /** {@inheritDoc} */
-        @Override
-        public String id() {
-            return "catalog-history-watch";
-        }
-
-        /** {@inheritDoc} */
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             return completedFuture(null);
         }
 
-        /** {@inheritDoc} */
         @Override
         public void onError(Throwable e) {
             LOG.warn("Unable to process catalog update event", e);
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index f353230f9a..5d23ff9b3b 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInfo;
 
@@ -76,7 +75,7 @@ public final class IgniteTestUtils {
         assert fieldName != null;
 
         try {
-            Class<?> cls = obj instanceof Class ? (Class) obj : obj.getClass();
+            Class<?> cls = obj instanceof Class ? (Class<?>) obj : 
obj.getClass();
 
             Field field = cls.getDeclaredField(fieldName);
 
@@ -93,7 +92,7 @@ public final class IgniteTestUtils {
                 throw new IgniteInternalException("Modification of static 
final field through reflection.");
             }
 
-            if (!field.isAccessible()) {
+            if (!field.canAccess(obj)) {
                 field.setAccessible(true);
             }
 
@@ -112,13 +111,13 @@ public final class IgniteTestUtils {
      * @param val       New field value.
      * @throws IgniteInternalException In case of error.
      */
-    public static void setFieldValue(Object obj, Class cls, String fieldName, 
Object val) throws IgniteInternalException {
+    public static void setFieldValue(Object obj, Class<?> cls, String 
fieldName, Object val) throws IgniteInternalException {
         assert fieldName != null;
 
         try {
             Field field = cls.getDeclaredField(fieldName);
 
-            if (!field.isAccessible()) {
+            if (!field.canAccess(obj)) {
                 field.setAccessible(true);
             }
 
@@ -150,14 +149,14 @@ public final class IgniteTestUtils {
     }
 
     /**
-     * Returns field value.
+     * Finds a field in the given {@code target} object of the {@code 
declaredClass} type.
      *
-     * @param target        target object from which to get field value 
({@code null} for static methods)
+     * @param target        target object from which to get field ({@code 
null} for static methods)
      * @param declaredClass class on which the field is declared
      * @param fieldName     name of the field
-     * @return field value
+     * @return field
      */
-    public static Object getFieldValue(Object target, Class<?> declaredClass, 
String fieldName) {
+    public static Field getField(@Nullable Object target, Class<?> 
declaredClass, String fieldName) {
         Field field;
         try {
             field = declaredClass.getDeclaredField(fieldName);
@@ -165,12 +164,24 @@ public final class IgniteTestUtils {
             throw new IgniteInternalException("Did not find a field", e);
         }
 
-        if (!field.isAccessible()) {
+        if (!field.canAccess(target)) {
             field.setAccessible(true);
         }
 
+        return field;
+    }
+
+    /**
+     * Returns field value.
+     *
+     * @param target        target object from which to get field value 
({@code null} for static methods)
+     * @param declaredClass class on which the field is declared
+     * @param fieldName     name of the field
+     * @return field value
+     */
+    public static Object getFieldValue(@Nullable Object target, Class<?> 
declaredClass, String fieldName) {
         try {
-            return field.get(target);
+            return getField(target, declaredClass, fieldName).get(target);
         } catch (IllegalAccessException e) {
             throw new IgniteInternalException("Cannot get field value", e);
         }
@@ -185,60 +196,30 @@ public final class IgniteTestUtils {
      * @return Field value.
      * @throws IgniteInternalException In case of error.
      */
-    public static <T> T getFieldValue(Object obj, String... fieldNames) {
+    public static <T> T getFieldValue(@Nullable Object obj, String... 
fieldNames) {
         assert obj != null;
         assert fieldNames != null;
         assert fieldNames.length >= 1;
 
-        try {
-            for (String fieldName : fieldNames) {
-                Class<?> cls = obj instanceof Class ? (Class) obj : 
obj.getClass();
-
-                try {
-                    obj = findField(cls, obj, fieldName);
-                } catch (NoSuchFieldException e) {
-                    // Resolve inner class, if not an inner field.
-                    Class<?> innerCls = getInnerClass(cls, fieldName);
-
-                    if (innerCls == null) {
-                        throw new IgniteInternalException("Failed to get 
object field [obj=" + obj
-                                + ", fieldNames=" + 
Arrays.toString(fieldNames) + ']', e);
-                    }
+        for (String fieldName : fieldNames) {
+            Class<?> cls = obj instanceof Class ? (Class<?>) obj : 
obj.getClass();
 
-                    obj = innerCls;
+            try {
+                obj = getFieldValue(obj, cls, fieldName);
+            } catch (IgniteInternalException e) {
+                // Resolve inner class, if not an inner field.
+                Class<?> innerCls = getInnerClass(cls, fieldName);
+
+                if (innerCls == null) {
+                    throw new IgniteInternalException("Failed to get object 
field [obj=" + obj
+                            + ", fieldNames=" + Arrays.toString(fieldNames) + 
']', e);
                 }
-            }
 
-            return (T) obj;
-        } catch (IllegalAccessException e) {
-            throw new IgniteInternalException("Failed to get object field 
[obj=" + obj
-                    + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
-        }
-    }
-
-    /**
-     * Get object field value via reflection.
-     *
-     * @param cls Class for searching.
-     * @param obj Target object.
-     * @param fieldName Field name for search.
-     * @return Field from object if it was found.
-     */
-    private static Object findField(
-            Class<?> cls,
-            Object obj,
-            String fieldName
-    ) throws NoSuchFieldException, IllegalAccessException {
-        // Resolve inner field.
-        Field field = cls.getDeclaredField(fieldName);
-
-        boolean accessible = field.isAccessible();
-
-        if (!accessible) {
-            field.setAccessible(true);
+                obj = innerCls;
+            }
         }
 
-        return field.get(obj);
+        return (T) obj;
     }
 
     /**
@@ -266,8 +247,8 @@ public final class IgniteTestUtils {
      * @return Thrown throwable.
      */
     public static Throwable assertThrowsWithCause(
-            @NotNull RunnableX run,
-            @NotNull Class<? extends Throwable> cls
+            RunnableX run,
+            Class<? extends Throwable> cls
     ) {
         return assertThrowsWithCause(run, cls, null);
     }
@@ -281,8 +262,8 @@ public final class IgniteTestUtils {
      * @return Thrown throwable.
      */
     public static Throwable assertThrowsWithCause(
-            @NotNull RunnableX run,
-            @NotNull Class<? extends Throwable> cls,
+            RunnableX run,
+            Class<? extends Throwable> cls,
             @Nullable String msg
     ) {
         try {
@@ -310,8 +291,8 @@ public final class IgniteTestUtils {
      * @return {@code True} if one of the causing exception is an instance of 
passed in classes, {@code false} otherwise.
      */
     public static boolean hasCause(
-            @NotNull Throwable t,
-            @NotNull Class<?> cls,
+            Throwable t,
+            Class<?> cls,
             @Nullable String messageFragment
     ) {
         for (Throwable th = t; th != null; th = th.getCause()) {
@@ -349,10 +330,7 @@ public final class IgniteTestUtils {
      * @param cls Cause classes to check.
      * @return reference to the cause error if found, otherwise returns {@code 
null}.
      */
-    public static <T extends Throwable> T cause(
-            @NotNull Throwable t,
-            @NotNull Class<T> cls
-    ) {
+    public static <T extends Throwable> @Nullable T cause(Throwable t, 
Class<T> cls) {
         return cause(t, cls, null);
     }
 
@@ -367,9 +345,9 @@ public final class IgniteTestUtils {
      * @param msg Message text that should be in cause (if {@code null}, 
message won't be checked).
      * @return reference to the cause error if found, otherwise returns {@code 
null}.
      */
-    public static <T extends Throwable> T cause(
-            @NotNull Throwable t,
-            @NotNull Class<T> cls,
+    public static <T extends Throwable> @Nullable T cause(
+            Throwable t,
+            Class<T> cls,
             @Nullable String msg
     ) {
         for (Throwable th = t; th != null; th = th.getCause()) {
@@ -399,7 +377,7 @@ public final class IgniteTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static CompletableFuture<?> runAsync(final RunnableX task) {
+    public static CompletableFuture<?> runAsync(RunnableX task) {
         return runAsync(task, "async-runnable-runner");
     }
 
@@ -409,7 +387,7 @@ public final class IgniteTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static CompletableFuture<?> runAsync(final RunnableX task, String 
threadName) {
+    public static CompletableFuture<?> runAsync(RunnableX task, String 
threadName) {
         return runAsync(() -> {
             try {
                 task.run();
@@ -427,7 +405,7 @@ public final class IgniteTestUtils {
      * @param task Callable.
      * @return Future with task result.
      */
-    public static <T> CompletableFuture<T> runAsync(final Callable<T> task) {
+    public static <T> CompletableFuture<T> runAsync(Callable<T> task) {
         return runAsync(task, "async-callable-runner");
     }
 
@@ -438,10 +416,10 @@ public final class IgniteTestUtils {
      * @param threadName Thread name.
      * @return Future with task result.
      */
-    public static <T> CompletableFuture<T> runAsync(final Callable<T> task, 
String threadName) {
-        final NamedThreadFactory thrFactory = new 
NamedThreadFactory(threadName, LOG);
+    public static <T> CompletableFuture<T> runAsync(Callable<T> task, String 
threadName) {
+        NamedThreadFactory thrFactory = new NamedThreadFactory(threadName, 
LOG);
 
-        final CompletableFuture<T> fut = new CompletableFuture<T>();
+        CompletableFuture<T> fut = new CompletableFuture<T>();
 
         thrFactory.newThread(() -> {
             try {
@@ -556,7 +534,7 @@ public final class IgniteTestUtils {
      * @param threadName Thread names.
      * @return Future for the run. Future returns execution time in 
milliseconds.
      */
-    public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?> 
call, int threadNum, final String threadName) {
+    public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?> 
call, int threadNum, String threadName) {
         List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum, 
call);
 
         NamedThreadFactory threadFactory = new NamedThreadFactory(threadName, 
LOG);
@@ -724,7 +702,6 @@ public final class IgniteTestUtils {
      * <p>This method erases type of the exception in the thrown clause, so 
checked exception could be thrown without need to wrap it with
      * unchecked one or adding a similar throws clause to the upstream methods.
      */
-    @SuppressWarnings("unchecked")
     public static <E extends Throwable> void sneakyThrow(Throwable e) throws E 
{
         throw (E) e;
     }
@@ -739,7 +716,7 @@ public final class IgniteTestUtils {
      * @return A result of the stage.
      */
     @SuppressWarnings("UnusedReturnValue")
-    public static <T> T await(CompletionStage<T> stage, long timeout, TimeUnit 
unit) {
+    public static <T> @Nullable T await(CompletionStage<T> stage, long 
timeout, TimeUnit unit) {
         try {
             return stage.toCompletableFuture().get(timeout, unit);
         } catch (Throwable e) {
@@ -771,7 +748,7 @@ public final class IgniteTestUtils {
      * @return A result of the stage.
      */
     @SuppressWarnings("UnusedReturnValue")
-    public static <T> T await(CompletionStage<T> stage) {
+    public static <T> @Nullable T await(CompletionStage<T> stage) {
         return await(stage, TIMEOUT_SEC, TimeUnit.SECONDS);
     }
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9ad4d43253..08ceb1accb 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -122,8 +122,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
     private static final String DISTRIBUTION_ZONE_MANAGER_POOL_NAME = 
"dst-zones-scheduler";
 
-    private static final String META_STORAGE_WATCH_ID = "dst-zones-watch";
-
     /** Id of the default distribution zone. */
     public static final int DEFAULT_ZONE_ID = 0;
 
@@ -893,12 +891,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         try {
             vaultMgr.get(zonesLogicalTopologyKey())
-                    
.thenAcceptBoth(metaStorageManager.appliedRevision(META_STORAGE_WATCH_ID), 
(vaultEntry, appliedRevision) -> {
+                    .thenAccept(vaultEntry -> {
                         if (!busyLock.enterBusy()) {
                             throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
                         }
 
                         try {
+                            long appliedRevision = 
metaStorageManager.appliedRevision();
+
                             if (vaultEntry != null && vaultEntry.value() != 
null) {
                                 logicalTopology = 
fromBytes(vaultEntry.value());
 
@@ -931,11 +931,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
     private WatchListener createMetastorageListener() {
         return new WatchListener() {
-            @Override
-            public String id() {
-                return META_STORAGE_WATCH_ID;
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 82185aa6bd..7807aaa687 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -108,8 +108,6 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
 
-        
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(0L));
-
         cmgManager = mock(ClusterManagementGroupManager.class);
 
         clusterStateStorage = new TestClusterStateStorage();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 0bd9fa3878..827aff1c9b 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -1600,7 +1600,7 @@ public class DistributionZoneManagerScaleUpTest {
     }
 
     private void mockVaultAppliedRevision(long revision) {
-        
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(revision));
+        when(metaStorageManager.appliedRevision()).thenReturn(revision);
     }
 
     private void watchListenerOnUpdate(Set<String> nodes, long rev) {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index 84f1a807e6..7be638fec1 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -333,7 +333,7 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
     }
 
     private void mockVaultAppliedRevision(long revision) {
-        
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(revision));
+        when(metaStorageManager.appliedRevision()).thenReturn(revision);
     }
 
     private void mockCmgLocalNodes() {
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index f4fa35bb2c..d49a7d49ef 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -40,11 +40,9 @@ import org.jetbrains.annotations.Nullable;
 public interface MetaStorageManager extends IgniteComponent {
     /**
      * Returns the current <em>applied revision</em> of the Meta Storage, that 
is, the most recent revision of updates that have been
-     * processed by a particular Watch on this node.
-     *
-     * @param watchId ID of the watch that this revision corresponds to.
+     * processed by all Watches on this node.
      */
-    CompletableFuture<Long> appliedRevision(String watchId);
+    long appliedRevision();
 
     /**
      * Retrieves an entry for the given key.
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
index 8dc49fcd98..33130cb77f 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
@@ -39,8 +39,8 @@ public class WatchEvent {
      * @param entryEvts Events for entries corresponding to an update under 
one revision.
      * @param revision Revision of the updated entries.
      */
-    public WatchEvent(List<EntryEvent> entryEvts, long revision) {
-        this.entryEvts = entryEvts;
+    public WatchEvent(Collection<EntryEvent> entryEvts, long revision) {
+        this.entryEvts = List.copyOf(entryEvts);
         this.revision = revision;
     }
 
@@ -77,6 +77,8 @@ public class WatchEvent {
      * @return Entry event.
      */
     public EntryEvent entryEvent() {
+        assert single();
+
         return entryEvts.get(0);
     }
 
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
index 665ac765a0..95696b971f 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
@@ -23,12 +23,6 @@ import java.util.concurrent.CompletableFuture;
  * The listener which receives and handles watch updates.
  */
 public interface WatchListener {
-    /**
-     * Returns a unique identifier for this Watch. This identifier should 
never change between node restarts and must uniquely identify
-     * a Watch among all Watches on a local node.
-     */
-    String id();
-
     /**
      * The method will be called on each meta storage update.
      *
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index c37073a221..8952da63d9 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -189,7 +190,7 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
         assertThat(vaultManager.get(key1), willBe(nullValue()));
         assertThat(vaultManager.get(key2), willBe(nullValue()));
 
-        metaStorageManager.registerExactWatch(key1, new NoOpListener("test1"));
+        metaStorageManager.registerExactWatch(key1, new NoOpListener());
 
         invokeFuture = metaStorageManager.invoke(
                 Conditions.exists(new ByteArray("foo")),
@@ -202,16 +203,15 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
 
         assertThat(invokeFuture, willBe(true));
 
-        assertTrue(waitForCondition(() -> 
metaStorageManager.appliedRevision("test1").join() == 2, 10_000));
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
== 2, 10_000));
 
         // Expect that only the watched key is persisted.
         assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), 
willBe(value));
         assertThat(vaultManager.get(key2), willBe(nullValue()));
 
-        metaStorageManager.registerExactWatch(key2, new NoOpListener("test2"));
+        metaStorageManager.registerExactWatch(key2, new NoOpListener());
 
-        assertThat(metaStorageManager.appliedRevision("test1"), willBe(2L));
-        assertThat(metaStorageManager.appliedRevision("test2"), willBe(0L));
+        assertThat(metaStorageManager.appliedRevision(), is(2L));
 
         byte[] newValue = "newValue".getBytes(StandardCharsets.UTF_8);
 
@@ -226,25 +226,13 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
 
         assertThat(invokeFuture, willBe(true));
 
-        assertTrue(waitForCondition(() -> 
metaStorageManager.appliedRevision("test1").join() == 3, 10_000));
-        assertTrue(waitForCondition(() -> 
metaStorageManager.appliedRevision("test2").join() == 3, 10_000));
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
== 3, 10_000));
 
         assertThat(vaultManager.get(key1).thenApply(VaultEntry::value), 
willBe(newValue));
         assertThat(vaultManager.get(key2).thenApply(VaultEntry::value), 
willBe(newValue));
     }
 
     private static class NoOpListener implements WatchListener {
-        private final String id;
-
-        NoOpListener(String id) {
-            this.id = id;
-        }
-
-        @Override
-        public String id() {
-            return id;
-        }
-
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             return completedFuture(null);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 9a90c3d8cf..c3d358dfea 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -249,11 +249,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
         var awaitFuture = new CompletableFuture<EntryEvent>();
 
         secondNode.metaStorageManager.registerExactWatch(key, new 
WatchListener() {
-            @Override
-            public String id() {
-                return "test";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 // Skip the first update event, because it's not guaranteed to 
arrive here (insert may have happened before the watch was
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index e5e758f862..3d3bf01197 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -205,11 +205,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
     @Test
     void testExactWatch() throws Exception {
         testWatches((node, latch) -> 
node.metaStorageManager.registerExactWatch(new ByteArray("foo"), new 
WatchListener() {
-            @Override
-            public String id() {
-                return "test";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 assertThat(event.entryEvent().newEntry().key(), 
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -230,11 +225,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
     @Test
     void testPrefixWatch() throws Exception {
         testWatches((node, latch) -> 
node.metaStorageManager.registerPrefixWatch(new ByteArray("fo"), new 
WatchListener() {
-            @Override
-            public String id() {
-                return "test";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 assertThat(event.entryEvent().newEntry().key(), 
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -259,11 +249,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
             var endRange = new ByteArray("foz");
 
             node.metaStorageManager.registerRangeWatch(startRange, endRange, 
new WatchListener() {
-                @Override
-                public String id() {
-                    return "test";
-                }
-
                 @Override
                 public CompletableFuture<Void> onUpdate(WatchEvent event) {
                     assertThat(event.entryEvent().newEntry().key(), 
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -322,11 +307,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
         for (Node node : nodes) {
             node.metaStorageManager.registerExactWatch(new ByteArray("foo"), 
new WatchListener() {
-                @Override
-                public String id() {
-                    return "test1";
-                }
-
                 @Override
                 public CompletableFuture<Void> onUpdate(WatchEvent event) {
                     assertThat(event.entryEvent().newEntry().key(), 
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -344,11 +324,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
             });
 
             node.metaStorageManager.registerPrefixWatch(new ByteArray("ba"), 
new WatchListener() {
-                @Override
-                public String id() {
-                    return "test2";
-                }
-
                 @Override
                 public CompletableFuture<Void> onUpdate(WatchEvent event) {
                     List<String> keys = event.entryEvents().stream()
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index d1fe91c36e..072348445e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -21,13 +21,17 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static 
org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -41,6 +45,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -53,6 +58,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -77,7 +83,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /**
      * Special key for the Vault where the applied revision is stored.
      */
-    private static final String APPLIED_REV_PREFIX = "applied_revision_";
+    private static final ByteArray APPLIED_REV_KEY = new 
ByteArray("applied_revision");
 
     private final ClusterService clusterService;
 
@@ -103,6 +109,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private volatile long appliedRevision;
+
     /**
      * The constructor.
      *
@@ -176,6 +184,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     public void start() {
         storage.start();
 
+        appliedRevision = readRevisionFromVault();
+
         cmgMgr.metaStorageNodes()
                 .thenCompose(metaStorageNodes -> {
                     if (!busyLock.enterBusy()) {
@@ -197,6 +207,16 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                 });
     }
 
+    private long readRevisionFromVault() {
+        try {
+            VaultEntry entry = vaultMgr.get(APPLIED_REV_KEY).get(10, 
TimeUnit.SECONDS);
+
+            return entry == null ? 0L : bytesToLong(entry.value());
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            throw new MetaStorageException(RESTORING_STORAGE_ERR, e);
+        }
+    }
+
     @Override
     public void stop() throws Exception {
         if (!isStopped.compareAndSet(false, true)) {
@@ -214,32 +234,23 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     }
 
     @Override
-    public CompletableFuture<Long> appliedRevision(String watchId) {
-        return vaultMgr.get(appliedRevisionKey(watchId))
-                .thenApply(appliedRevision -> appliedRevision == null ? 0L : 
bytesToLong(appliedRevision.value()));
-    }
-
-    private long appliedRevision(WatchListener lsnr) {
-        return appliedRevision(lsnr.id()).join();
-    }
-
-    private static ByteArray appliedRevisionKey(String watchId) {
-        return ByteArray.fromString(APPLIED_REV_PREFIX + watchId);
+    public long appliedRevision() {
+        return appliedRevision;
     }
 
     @Override
     public void registerPrefixWatch(ByteArray key, WatchListener listener) {
-        storage.watchPrefix(key.bytes(), appliedRevision(listener) + 1, 
listener);
+        storage.watchPrefix(key.bytes(), appliedRevision() + 1, listener);
     }
 
     @Override
     public void registerExactWatch(ByteArray key, WatchListener listener) {
-        storage.watchExact(key.bytes(), appliedRevision(listener) + 1, 
listener);
+        storage.watchExact(key.bytes(), appliedRevision() + 1, listener);
     }
 
     @Override
     public void registerRangeWatch(ByteArray keyFrom, @Nullable ByteArray 
keyTo, WatchListener listener) {
-        storage.watchRange(keyFrom.bytes(), keyTo == null ? null : 
keyTo.bytes(), appliedRevision(listener) + 1, listener);
+        storage.watchRange(keyFrom.bytes(), keyTo == null ? null : 
keyTo.bytes(), appliedRevision() + 1, listener);
     }
 
     @Override
@@ -570,7 +581,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /**
      * Saves processed Meta Storage revision and corresponding entries to the 
Vault.
      */
-    private CompletableFuture<Void> saveUpdatedEntriesToVault(String watchId, 
WatchEvent watchEvent) {
+    private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent 
watchEvent) {
         if (!busyLock.enterBusy()) {
             LOG.info("Skipping applying MetaStorage revision because the node 
is stopping");
 
@@ -578,17 +589,21 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         }
 
         try {
+            CompletableFuture<Void> saveToVaultFuture;
+
             if (watchEvent.entryEvents().isEmpty()) {
-                return vaultMgr.put(appliedRevisionKey(watchId), 
longToBytes(watchEvent.revision()));
+                saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, 
longToBytes(watchEvent.revision()));
             } else {
                 Map<ByteArray, byte[]> batch = 
IgniteUtils.newHashMap(watchEvent.entryEvents().size() + 1);
 
-                batch.put(appliedRevisionKey(watchId), 
longToBytes(watchEvent.revision()));
+                batch.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
 
                 watchEvent.entryEvents().forEach(e -> batch.put(new 
ByteArray(e.newEntry().key()), e.newEntry().value()));
 
-                return vaultMgr.putAll(batch);
+                saveToVaultFuture = vaultMgr.putAll(batch);
             }
+
+            return saveToVaultFuture.thenRun(() -> appliedRevision = 
watchEvent.revision());
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 155ffb5b69..0ce5b39072 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -26,11 +26,10 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
 @FunctionalInterface
 public interface OnRevisionAppliedCallback {
     /**
-     * Notifies of completion of processing of Meta Storage watches for a 
particular Watch and revision.
+     * Notifies of completion of processing of Meta Storage watches for a 
particular revision.
      *
-     * @param watchId ID of the Watch that has finished processing the 
revision.
-     * @param watchEvent Event with modified Meta Storage entries processed by 
the Watch.
+     * @param watchEvent Event with modified Meta Storage entries processed at 
least one Watch.
      * @return Future that represents the state of the execution of the 
callback.
      */
-    CompletableFuture<Void> onRevisionApplied(String watchId, WatchEvent 
watchEvent);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
index 42c35ec9e3..33da407c6c 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -26,8 +26,8 @@ import org.apache.ignite.internal.metastorage.WatchListener;
  * Subscription on updates of Meta Storage entries corresponding to a subset 
of keys, starting from a given revision number.
  */
 public class Watch {
-    /** Current revision. */
-    private volatile long targetRevision;
+    /** Minimum revision of entries that this Watch must be notified of. */
+    private final long startRevision;
 
     /** Key predicate. */
     private final Predicate<byte[]> predicate;
@@ -45,7 +45,7 @@ public class Watch {
     public Watch(long startRevision, WatchListener listener, Predicate<byte[]> 
predicate) {
         this.predicate = predicate;
         this.listener = listener;
-        this.targetRevision = startRevision;
+        this.startRevision = startRevision;
     }
 
     /**
@@ -54,8 +54,8 @@ public class Watch {
      * @param key Meta Storage key.
      * @param revision Revision corresponding to the given {@code key}.
      */
-    public boolean matches(byte[] key, long revision) {
-        return revision >= targetRevision && predicate.test(key);
+    boolean matches(byte[] key, long revision) {
+        return revision >= startRevision && predicate.test(key);
     }
 
     /**
@@ -63,9 +63,7 @@ public class Watch {
      *
      * @see WatchListener#onUpdate
      */
-    public CompletableFuture<Void> onUpdate(WatchEvent event) {
-        targetRevision = event.revision() + 1;
-
+    CompletableFuture<Void> onUpdate(WatchEvent event) {
         return listener.onUpdate(event);
     }
 
@@ -74,56 +72,28 @@ public class Watch {
      *
      * @see WatchListener#onRevisionUpdated
      */
-    public CompletableFuture<Void> onRevisionUpdated(long revision) {
-        targetRevision = revision + 1;
-
+    CompletableFuture<Void> onRevisionUpdated(long revision) {
         return listener.onRevisionUpdated(revision);
     }
 
     /**
      * Callback that gets called if an error has occurred during the event 
processing.
      */
-    public void onError(Throwable e) {
+    void onError(Throwable e) {
         listener.onError(e);
     }
 
-    /**
-     * Returns the ID of the Watch.
-     */
-    public String id() {
-        return listener.id();
-    }
-
     /**
      * Returns the event listener.
      */
-    public WatchListener listener() {
+    WatchListener listener() {
         return listener;
     }
 
     /**
-     * Returns the current Meta Storage revision this Watch is listening to.
+     * Returns the minimum Meta Storage revision this Watch is listening to.
      */
-    public long targetRevision() {
-        return targetRevision;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        Watch watch = (Watch) o;
-
-        return id().equals(watch.id());
-    }
-
-    @Override
-    public int hashCode() {
-        return id().hashCode();
+    long startRevision() {
+        return startRevision;
     }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 6189c12cee..aac4330e29 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -17,15 +17,17 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +45,8 @@ import org.apache.ignite.internal.util.IgniteUtils;
  * Class for storing and notifying Meta Storage Watches.
  *
  * <p>Every Meta Storage update is processed by each registered Watch in 
parallel, however notifications for a single Watch are
- * linearised (Watches are always notified of one event at a time and in 
increasing order of revisions).
+ * linearised (Watches are always notified of one event at a time and in 
increasing order of revisions). It is also guaranteed that
+ * Watches will not get notified of a new revision until all Watches have 
finished processing a previous revision.
  */
 public class WatchProcessor implements ManuallyCloseable {
     /** Reads an entry from the storage using a given key and revision. */
@@ -55,7 +58,15 @@ public class WatchProcessor implements ManuallyCloseable {
     private static final IgniteLogger LOG = 
Loggers.forClass(WatchProcessor.class);
 
     /** Map that contains Watches and corresponding Watch notification process 
(represented as a CompletableFuture). */
-    private final ConcurrentMap<Watch, CompletableFuture<Void>> watches = new 
ConcurrentHashMap<>();
+    private final List<Watch> watches = new CopyOnWriteArrayList<>();
+
+    /**
+     * Future that represents the process of notifying registered Watches 
about a Meta Storage revision.
+     *
+     * <p>Since Watches are notified concurrently, this future is used to 
guarantee that no Watches get notified of a new revision,
+     * until all Watches have finished processing the previous revision.
+     */
+    private volatile CompletableFuture<Void> notificationFuture = 
completedFuture(null);
 
     private final EntryReader entryReader;
 
@@ -78,22 +89,20 @@ public class WatchProcessor implements ManuallyCloseable {
 
     /** Adds a watch. */
     public void addWatch(Watch watch) {
-        assert !watches.containsKey(watch) : "Watch with id \"" + watch.id() + 
"\" already exists";
-
-        watches.put(watch, completedFuture(null));
+        watches.add(watch);
     }
 
     /** Removes a watch (identified by its listener). */
     public void removeWatch(WatchListener listener) {
-        watches.keySet().removeIf(watch -> watch.listener() == listener);
+        watches.removeIf(watch -> watch.listener() == listener);
     }
 
     /**
      * Returns the minimal target revision of all registered watches.
      */
     public OptionalLong minWatchRevision() {
-        return watches.keySet().stream()
-                .mapToLong(Watch::targetRevision)
+        return watches.stream()
+                .mapToLong(Watch::startRevision)
                 .min();
     }
 
@@ -110,61 +119,96 @@ public class WatchProcessor implements ManuallyCloseable {
      * Notifies registered watch about an update event.
      */
     public void notifyWatches(List<Entry> updatedEntries) {
-        watches.replaceAll((watch, watchOperation) ->
-                watchOperation.thenComposeAsync(v -> notifyWatch(watch, 
updatedEntries), watchExecutor));
+        notificationFuture = notificationFuture
+                .thenComposeAsync(v -> {
+                    // Revision must be the same for all entries.
+                    long newRevision = updatedEntries.get(0).revision();
+
+                    // Notify all watches in parallel, then aggregate the 
entries that they have processed.
+                    CompletableFuture<List<EntryEvent>>[] notificationFutures 
= watches.stream()
+                            .map(watch -> notifyWatch(watch, updatedEntries, 
newRevision))
+                            .toArray(CompletableFuture[]::new);
+
+                    return allOf(notificationFutures)
+                            .thenComposeAsync(ignored -> 
invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+                }, watchExecutor);
     }
 
-    private CompletableFuture<Void> notifyWatch(Watch watch, List<Entry> 
updatedEntries) {
-        // Revision must be the same for all entries.
-        long newRevision = updatedEntries.get(0).revision();
+    private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch, 
List<Entry> updatedEntries, long revision) {
+        CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> {
+            List<EntryEvent> entryEvents = List.of();
 
-        List<EntryEvent> entryEvents = List.of();
+            for (Entry newEntry : updatedEntries) {
+                byte[] newKey = newEntry.key();
 
-        for (Entry newEntry : updatedEntries) {
-            byte[] newKey = newEntry.key();
+                assert newEntry.revision() == revision;
 
-            assert newEntry.revision() == newRevision;
+                if (watch.matches(newKey, revision)) {
+                    Entry oldEntry = entryReader.get(newKey, revision - 1);
 
-            if (watch.matches(newKey, newRevision)) {
-                Entry oldEntry = entryReader.get(newKey, newRevision - 1);
+                    if (entryEvents.isEmpty()) {
+                        entryEvents = new ArrayList<>();
+                    }
 
-                if (entryEvents.isEmpty()) {
-                    entryEvents = new ArrayList<>();
+                    entryEvents.add(new EntryEvent(oldEntry, newEntry));
                 }
-
-                entryEvents.add(new EntryEvent(oldEntry, newEntry));
             }
-        }
 
-        var event = new WatchEvent(entryEvents, newRevision);
+            return entryEvents;
+        }, watchExecutor);
 
-        CompletableFuture<Void> eventNotificationFuture;
+        return eventFuture
+                .thenCompose(entryEvents -> {
+                    CompletableFuture<Void> eventNotificationFuture = 
entryEvents.isEmpty()
+                            ? watch.onRevisionUpdated(revision)
+                            : watch.onUpdate(new WatchEvent(entryEvents, 
revision));
 
-        try {
-            eventNotificationFuture = entryEvents.isEmpty()
-                    ? watch.onRevisionUpdated(newRevision)
-                    : watch.onUpdate(event);
-        } catch (Throwable e) {
-            eventNotificationFuture = failedFuture(e);
-        }
-
-        return eventNotificationFuture
+                    return eventNotificationFuture.thenApply(v -> entryEvents);
+                })
                 .whenComplete((v, e) -> {
                     if (e != null) {
+                        if (e instanceof CompletionException) {
+                            e = e.getCause();
+                        }
+
                         // TODO: IGNITE-14693 Implement Meta storage exception 
handling logic.
-                        LOG.error("Error occurred when processing a watch 
event {}, watch {} is going to be stopped", e, event, watch.id());
+                        LOG.error("Error occurred when processing a watch 
event", e);
 
                         watch.onError(e);
-
-                        watches.remove(watch);
                     }
-                })
-                .thenCompose(v -> 
revisionCallback.onRevisionApplied(watch.id(), event));
+                });
+    }
+
+    private CompletableFuture<Void> 
invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[] 
notificationFutures, long revision) {
+        try {
+            // Only notify about entries that have been accepted by at least 
one Watch.
+            var acceptedEntries = new HashSet<EntryEvent>();
+
+            for (CompletableFuture<List<EntryEvent>> future : 
notificationFutures) {
+                // This method must only be invoked when all passed futures 
have been completed.
+                assert future.isDone();
+
+                acceptedEntries.addAll(future.join());
+            }
+
+            var event = new WatchEvent(acceptedEntries, revision);
+
+            return revisionCallback.onRevisionApplied(event)
+                    .whenComplete((ignored, e) -> {
+                        if (e != null) {
+                            LOG.error("Error occurred when notifying watches", 
e);
+                        }
+                    });
+        } catch (Throwable e) {
+            LOG.error("Error occurred when notifying watches", e);
+
+            throw e;
+        }
     }
 
     @Override
     public void close() {
-        watches.values().forEach(f -> f.cancel(true));
+        notificationFuture.cancel(true);
 
         IgniteUtils.shutdownAndAwaitTermination(watchExecutor, 10, 
TimeUnit.SECONDS);
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 4cd848247b..12a156eea9 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -36,8 +36,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -2087,16 +2087,11 @@ public abstract class AbstractKeyValueStorageTest {
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches((revision, updatedEntries) -> 
completedFuture(null));
+        storage.startWatches(event -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
         storage.watchExact(key(0), appliedRevision + 1, new WatchListener() {
-            @Override
-            public String id() {
-                return "test";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 fut.complete(event.entryEvent().newEntry().value());
@@ -2415,13 +2410,10 @@ public abstract class AbstractKeyValueStorageTest {
         WatchListener mockListener2 = mock(WatchListener.class);
         WatchListener mockListener3 = mock(WatchListener.class);
 
-        when(mockListener1.id()).thenReturn("test1");
         when(mockListener1.onUpdate(any())).thenReturn(completedFuture(null));
 
-        when(mockListener2.id()).thenReturn("test2");
         when(mockListener2.onUpdate(any())).thenReturn(completedFuture(null));
 
-        when(mockListener3.id()).thenReturn("test3");
         when(mockListener3.onUpdate(any())).thenReturn(completedFuture(null));
 
         var exception = new IllegalStateException();
@@ -2434,7 +2426,7 @@ public abstract class AbstractKeyValueStorageTest {
 
         OnRevisionAppliedCallback mockCallback = 
mock(OnRevisionAppliedCallback.class);
 
-        when(mockCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
+        
when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
 
         storage.startWatches(mockCallback);
 
@@ -2446,7 +2438,7 @@ public abstract class AbstractKeyValueStorageTest {
 
         verify(mockListener3, timeout(10_000)).onUpdate(any());
 
-        verify(mockCallback, times(2)).onRevisionApplied(any(), any());
+        verify(mockCallback, never()).onRevisionApplied(any());
     }
 
     private static void fill(KeyValueStorage storage, int keySuffix, int num) {
@@ -2489,11 +2481,6 @@ public abstract class AbstractKeyValueStorageTest {
         var resultFuture = new CompletableFuture<Void>();
 
         watchMethod.accept(new WatchListener() {
-            @Override
-            public String id() {
-                return "test";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 try {
@@ -2519,7 +2506,7 @@ public abstract class AbstractKeyValueStorageTest {
             }
         });
 
-        storage.startWatches((revision, updatedEntries) -> 
completedFuture(null));
+        storage.startWatches(event -> completedFuture(null));
 
         return resultFuture;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 51cc7dac6d..bb421f2438 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -70,11 +70,6 @@ public class RocksDbKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
         var latch = new CountDownLatch(2);
 
         storage.watchExact("foo".getBytes(UTF_8), 1, new WatchListener() {
-            @Override
-            public String id() {
-                return "test1";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 assertThat(event.entryEvent().newEntry().value(), 
is("bar".getBytes(UTF_8)));
@@ -91,11 +86,6 @@ public class RocksDbKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
         });
 
         storage.watchExact("baz".getBytes(UTF_8), 1, new WatchListener() {
-            @Override
-            public String id() {
-                return "test2";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent event) {
                 assertThat(event.entryEvent().newEntry().value(), 
is("quux".getBytes(UTF_8)));
@@ -111,7 +101,7 @@ public class RocksDbKeyValueStorageTest extends 
AbstractKeyValueStorageTest {
             }
         });
 
-        storage.startWatches((revision, updatedEntries) -> 
CompletableFuture.completedFuture(null));
+        storage.startWatches(event -> CompletableFuture.completedFuture(null));
 
         storage.restoreSnapshot(snapshotPath);
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 9f43d28e53..1e64ecd803 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -31,7 +33,6 @@ import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 
 /**
@@ -53,7 +55,7 @@ public class WatchProcessorTest {
 
     @BeforeEach
     void setUp() {
-        when(revisionCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
+        
when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
 
         watchProcessor.setRevisionCallback(revisionCallback);
     }
@@ -79,14 +81,23 @@ public class WatchProcessorTest {
 
         watchProcessor.notifyWatches(List.of(entry1, entry2));
 
-        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
-        verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
+        var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
+        var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
+
+        verify(listener1, timeout(1_000)).onUpdate(new 
WatchEvent(entryEvent1));
+        verify(listener2, timeout(1_000)).onUpdate(new 
WatchEvent(entryEvent2));
 
         verify(listener1, never()).onRevisionUpdated(anyLong());
         verify(listener2, never()).onRevisionUpdated(anyLong());
 
-        // Revision callback should be called for every listener update.
-        verify(revisionCallback, 
timeout(1_000).times(2)).onRevisionApplied(any(), any());
+        var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
+
+        verify(revisionCallback, 
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+
+        WatchEvent event = watchEventCaptor.getValue();
+
+        assertThat(event.entryEvents(), containsInAnyOrder(entryEvent1, 
entryEvent2));
+        assertThat(event.revision(), is(1L));
     }
 
     /**
@@ -105,20 +116,25 @@ public class WatchProcessorTest {
 
         watchProcessor.notifyWatches(List.of(entry1));
 
-        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry1), entry1)));
+        var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
+
+        verify(listener1, timeout(1_000)).onUpdate(event);
         verify(listener2, timeout(1_000)).onRevisionUpdated(1);
 
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+
         watchProcessor.notifyWatches(List.of(entry2));
 
+        event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
+
         verify(listener1, timeout(1_000)).onRevisionUpdated(2);
-        verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
+        verify(listener2, timeout(1_000)).onUpdate(event);
 
-        // Revision callback should be called for every listener update.
-        verify(revisionCallback, 
timeout(1_000).times(4)).onRevisionApplied(any(), any());
+        verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
     }
 
     /**
-     * Tests a scenario that, when a watch throws an exception, it gets 
disabled and other watches continue working.
+     * Tests a scenario that, when a watch throws an exception, watch 
processing finishes with an error.
      */
     @Test
     void testWatchFailure() {
@@ -126,7 +142,6 @@ public class WatchProcessorTest {
 
         WatchListener listener2 = mock(WatchListener.class);
 
-        when(listener2.id()).thenReturn("error");
         when(listener2.onUpdate(any())).thenThrow(new IllegalStateException());
 
         watchProcessor.addWatch(new Watch(0, listener1, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
@@ -141,19 +156,7 @@ public class WatchProcessorTest {
         verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
         verify(listener2, 
timeout(1_000)).onError(any(IllegalStateException.class));
 
-        var entry3 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
-
-        watchProcessor.notifyWatches(List.of(entry3));
-
-        clearInvocations(listener1, listener2, revisionCallback);
-
-        verify(listener1, timeout(1_000)).onRevisionUpdated(2);
-
-        verify(listener2, never()).onUpdate(any());
-        verify(listener2, never()).onError(any());
-        verify(listener2, never()).onRevisionUpdated(anyLong());
-
-        verify(revisionCallback, timeout(1_000)).onRevisionApplied(any(), 
any());
+        verify(revisionCallback, never()).onRevisionApplied(any());
     }
 
     /**
@@ -166,8 +169,6 @@ public class WatchProcessorTest {
 
         WatchListener listener2 = mock(WatchListener.class);
 
-        when(listener2.id()).thenReturn("blocking");
-
         var blockingFuture = new CompletableFuture<Void>();
 
         when(listener2.onUpdate(any()))
@@ -191,11 +192,13 @@ public class WatchProcessorTest {
 
         watchProcessor.notifyWatches(List.of(entry3, entry4));
 
-        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
+        verify(listener1, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
         verify(listener2, never()).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry4), entry4)));
 
         blockingFuture.complete(null);
 
+        verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry3), entry3)));
+
         InOrder inOrder = inOrder(listener2);
 
         inOrder.verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new 
EntryEvent(oldEntry(entry2), entry2)));
@@ -205,10 +208,6 @@ public class WatchProcessorTest {
     private static WatchListener mockListener() {
         var listener = mock(WatchListener.class);
 
-        String id = UUID.randomUUID().toString();
-
-        when(listener.id()).thenReturn(id);
-
         when(listener.onUpdate(any())).thenReturn(completedFuture(null));
         
when(listener.onRevisionUpdated(anyLong())).thenReturn(completedFuture(null));
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index c990e11817..71ce16672b 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -193,11 +193,6 @@ public class AssignmentsTracker {
      * Meta storage assignments watch.
      */
     private class AssignmentsListener implements WatchListener {
-        @Override
-        public String id() {
-            return STABLE_ASSIGNMENTS_PREFIX + "watch";
-        }
-
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             assert !event.entryEvent().newEntry().empty() : "New assignments 
are empty";
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
index ac937f3f83..4933f73593 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
@@ -124,11 +124,6 @@ public class LeaseTracker {
      * Listen lease holder updates.
      */
     private class UpdateListener implements WatchListener {
-        @Override
-        public String id() {
-            return PLACEMENTDRIVER_PREFIX + "watch";
-        }
-
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             for (EntryEvent entry : event.entryEvents()) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 4db3c7f5d2..3f9eefb6e3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -203,7 +203,7 @@ public class ItDistributedConfigurationStorageTest {
             assertThat(node.cfgStorage.writeConfigurationRevision(0, 1), 
willCompleteSuccessfully());
 
             assertTrue(waitForCondition(
-                    () -> 
node.metaStorageManager.appliedRevision(DistributedConfigurationStorage.WATCH_ID).join()
 != 0,
+                    () -> node.metaStorageManager.appliedRevision() != 0,
                     3000
             ));
         } finally {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index d58cd800de..dc3f880ee1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.test;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getField;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 
-import java.util.Map;
+import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
-import org.apache.ignite.internal.metastorage.server.Watch;
 import org.apache.ignite.internal.metastorage.server.WatchProcessor;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 
@@ -32,8 +32,9 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
  * Class for blocking Watch processing on a given Ignite node.
  */
 public class WatchListenerInhibitor {
-    /** "watches" field captured from the {@link RocksDbKeyValueStorage} 
instance. */
-    private final Map<Watch, CompletableFuture<Void>> watches;
+    private final WatchProcessor watchProcessor;
+
+    private final Field notificationFutureField;
 
     /** Future used to block the watch notification thread. */
     private final CompletableFuture<Void> inhibitFuture = new 
CompletableFuture<>();
@@ -52,20 +53,25 @@ public class WatchListenerInhibitor {
 
         var watchProcessor = (WatchProcessor) getFieldValue(storage, 
RocksDbKeyValueStorage.class, "watchProcessor");
 
-        var watches = (Map<Watch, CompletableFuture<Void>>) 
getFieldValue(watchProcessor, WatchProcessor.class, "watches");
-
-        return new WatchListenerInhibitor(watches);
+        return new WatchListenerInhibitor(watchProcessor);
     }
 
-    private WatchListenerInhibitor(Map<Watch, CompletableFuture<Void>> 
watches) {
-        this.watches = watches;
+    private WatchListenerInhibitor(WatchProcessor watchProcessor) {
+        this.watchProcessor = watchProcessor;
+        this.notificationFutureField = getField(watchProcessor, 
WatchProcessor.class, "notificationFuture");
     }
 
     /**
      * Starts inhibiting events.
      */
     public void startInhibit() {
-        watches.replaceAll((watch, watchOperation) -> 
watchOperation.thenCompose(v -> inhibitFuture));
+        try {
+            CompletableFuture<Void> notificationFuture = 
(CompletableFuture<Void>) notificationFutureField.get(watchProcessor);
+
+            notificationFutureField.set(watchProcessor, 
notificationFuture.thenCompose(v -> inhibitFuture));
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index 6961512d8a..f0dd8e74c1 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -65,8 +65,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     /** Prefix added to configuration keys to distinguish them in the meta 
storage. Must end with a dot. */
     private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
 
-    public static final String WATCH_ID = DISTRIBUTED_PREFIX + "watch";
-
     /**
      * Key for CAS-ing configuration keys to meta storage.
      */
@@ -208,9 +206,12 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
 
     @Override
     public CompletableFuture<Data> readDataOnRecovery() throws 
StorageException {
-        CompletableFuture<Data> future = 
metaStorageMgr.appliedRevision(WATCH_ID)
-                .thenCombine(vaultMgr.get(CONFIGURATION_REVISIONS_KEY), 
DistributedConfigurationStorage::resolveRevision)
-                .thenApplyAsync(this::readDataOnRecovery0, threadPool);
+        CompletableFuture<Data> future = 
vaultMgr.get(CONFIGURATION_REVISIONS_KEY)
+                .thenApplyAsync(entry -> {
+                    long revision = 
resolveRevision(metaStorageMgr.appliedRevision(), entry);
+
+                    return readDataOnRecovery0(revision);
+                }, threadPool);
 
         return registerFuture(future);
     }
@@ -311,11 +312,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
         // TODO: registerPrefixWatch could throw OperationTimeoutException and 
CompactedException and we should
         // TODO: properly handle such cases 
https://issues.apache.org/jira/browse/IGNITE-14604
         metaStorageMgr.registerPrefixWatch(DST_KEYS_START_RANGE, new 
WatchListener() {
-            @Override
-            public String id() {
-                return WATCH_ID;
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent events) {
                 Map<String, Serializable> data = 
IgniteUtils.newHashMap(events.entryEvents().size() - 1);
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
index 675fc85695..776fa792be 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -87,8 +87,7 @@ public class DistributedConfigurationCatchUpTest {
     }
 
     /**
-     * Tests that distributed configuration storage correctly picks up latest 
configuration MetaStorage revision
-     * during recovery process.
+     * Tests that distributed configuration storage correctly picks up latest 
configuration MetaStorage revision during recovery process.
      *
      * @throws Exception If failed.
      */
@@ -100,8 +99,6 @@ public class DistributedConfigurationCatchUpTest {
 
         MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
 
-        
when(wrapper.mock.appliedRevision(DistributedConfigurationStorage.WATCH_ID)).thenReturn(completedFuture(0L));
-
         DistributedConfigurationStorage storage = storage(wrapper);
 
         try {
@@ -130,7 +127,7 @@ public class DistributedConfigurationCatchUpTest {
         vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2, 
3, 4}).get();
 
         // This emulates a change in MetaStorage that is not related to the 
configuration.
-        
when(wrapper.mock.appliedRevision(DistributedConfigurationStorage.WATCH_ID)).thenReturn(completedFuture(2L));
+        when(wrapper.mock.appliedRevision()).thenReturn(2L);
 
         storage = storage(wrapper);
 
@@ -163,8 +160,8 @@ public class DistributedConfigurationCatchUpTest {
         private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
 
         /**
-         * This and previous field are copy-pasted intentionally, so in case 
if something changes,
-         * this test should fail and be reviewed and re-written.
+         * This and previous field are copy-pasted intentionally, so in case 
if something changes, this test should fail and be reviewed and
+         * re-written.
          */
         private static final ByteArray MASTER_KEY = new 
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 03136e6def..7848b4931a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1837,11 +1837,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private WatchListener createDistributionZonesDataNodesListener() {
         return new WatchListener() {
-            @Override
-            public String id() {
-                return "dst-zones-data-nodes-watch";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {
@@ -1921,11 +1916,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private WatchListener createPendingAssignmentsRebalanceListener() {
         return new WatchListener() {
-            @Override
-            public String id() {
-                return "pending-assignments-watch";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {
@@ -2156,11 +2146,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private WatchListener createStableAssignmentsRebalanceListener() {
         return new WatchListener() {
-            @Override
-            public String id() {
-                return "stable-assignments-watch";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {
@@ -2188,11 +2173,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private WatchListener createAssignmentsSwitchRebalanceListener() {
         return new WatchListener() {
-            @Override
-            public String id() {
-                return "assignments-switch-watch";
-            }
-
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 if (!busyLock.enterBusy()) {

Reply via email to