This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ada47270d IGNITE-18756 Awaiting for data nodes updating in 
distribution zone manager (#1729)
6ada47270d is described below

commit 6ada47270d21795ba583d6be415bf91f5c64e319
Author: Sergey Uttsel <[email protected]>
AuthorDate: Mon Apr 17 11:28:09 2023 +0300

    IGNITE-18756 Awaiting for data nodes updating in distribution zone manager 
(#1729)
---
 .../management/raft/TestClusterStateStorage.java   |   7 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  12 +
 .../distributionzones/DistributionZoneManager.java | 463 ++++++++++++++--
 .../distributionzones/DistributionZonesUtil.java   |  64 ++-
 .../DistributionZoneWasRemovedException.java       |  49 ++
 .../DistributionZoneAwaitDataNodesTest.java        | 582 +++++++++++++++++++++
 ...ibutionZoneManagerConfigurationChangesTest.java |  15 +-
 ...butionZoneManagerLogicalTopologyEventsTest.java |  21 +-
 .../DistributionZoneManagerScaleUpTest.java        |  33 +-
 .../DistributionZoneManagerWatchListenerTest.java  |  37 +-
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   3 +-
 .../internal/table/distributed/TableManager.java   |   4 +-
 .../TableManagerDistributionZonesTest.java         |  11 +-
 13 files changed, 1198 insertions(+), 103 deletions(-)

diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
index 736715a34a..6ab873c5a7 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.cluster.management.raft;
 
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -130,11 +130,6 @@ public class TestClusterStateStorage implements 
ClusterStateStorage {
         }
     }
 
-    private static boolean startsWith(byte[] key, byte[] prefix) {
-        return key.length >= prefix.length
-                && Arrays.equals(key, 0, prefix.length, prefix, 0, 
prefix.length);
-    }
-
     @Override
     public CompletableFuture<Void> snapshot(Path snapshotPath) {
         List<byte[]> keys;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ea16c07854..e93e9908e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1055,4 +1055,16 @@ public class IgniteUtils {
                     }
                 });
     }
+
+    /**
+     * Utility method to check if one byte array starts with a specified 
sequence of bytes.
+     *
+     * @param key The array to check.
+     * @param prefix The prefix bytes to test for.
+     * @return {@code true} if the key starts with the bytes from the prefix.
+     */
+    public static boolean startsWith(byte[] key, byte[] prefix) {
+        return key.length >= prefix.length
+                && Arrays.equals(key, 0, prefix.length, prefix, 0, 
prefix.length);
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9732870967..00cc8e6b92 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
@@ -24,6 +26,8 @@ import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyConditionForZonesChanges;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerScaleUpScaleDownKeysCondition;
@@ -32,8 +36,10 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -45,10 +51,10 @@ import static 
org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,15 +88,18 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneBindTableException;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
@@ -106,8 +115,11 @@ import 
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -138,6 +150,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Default number of zone partitions. */
     public static final int DEFAULT_PARTITION_COUNT = 25;
 
+    /**
+     * Value for the distribution zones' timers which means that data nodes 
changing for distribution zone
+     * will be started without waiting.
+     */
+    public static final int IMMEDIATE_TIMER_VALUE = 0;
+
     /** Default infinite value for the distribution zones' timers. */
     public static final int INFINITE_TIMER_VALUE = Integer.MAX_VALUE;
 
@@ -174,6 +192,15 @@ public class DistributionZoneManager implements 
IgniteComponent {
      */
     private final Map<Integer, ZoneState> zonesState;
 
+    /** The tracker for the last topology version which was observed by 
distribution zone manager. */
+    private final PendingComparableValuesTracker<Long> topVerTracker;
+
+    /** The last meta storage revision on which scale up timer was started. */
+    private volatile long lastScaleUpRevision;
+
+    /** The last meta storage revision on which scale down timer was started. 
*/
+    private volatile long lastScaleDownRevision;
+
     /** Listener for a topology events. */
     private final LogicalTopologyEventListener topologyEventListener = new 
LogicalTopologyEventListener() {
         @Override
@@ -198,8 +225,11 @@ public class DistributionZoneManager implements 
IgniteComponent {
      */
     private volatile Set<String> logicalTopology;
 
-    /** Watch listener. Needed to unregister it on {@link 
DistributionZoneManager#stop()}. */
-    private final WatchListener watchListener;
+    /** Watch listener for logical topology keys. */
+    private final WatchListener topologyWatchListener;
+
+    /** Watch listener for data nodes keys. */
+    private final WatchListener dataNodesWatchListener;
 
     /**
      * Creates a new distribution zone manager.
@@ -224,17 +254,21 @@ public class DistributionZoneManager implements 
IgniteComponent {
         this.logicalTopologyService = logicalTopologyService;
         this.vaultMgr = vaultMgr;
 
-        this.watchListener = createMetastorageListener();
+        this.topologyWatchListener = createMetastorageTopologyListener();
+
+        this.dataNodesWatchListener = createMetastorageDataNodesListener();
 
         zonesState = new ConcurrentHashMap<>();
 
-        logicalTopology = Collections.emptySet();
+        logicalTopology = emptySet();
 
         executor = new ScheduledThreadPoolExecutor(
                 Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
                 new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, 
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
                 new ThreadPoolExecutor.DiscardPolicy()
         );
+
+        topVerTracker = new PendingComparableValuesTracker<>(0L);
     }
 
     @TestOnly
@@ -511,6 +545,148 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
     }
 
+    /**
+     * The method for obtaining the data nodes of the specified zone.
+     * If {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} and
+     * {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} are immediate 
then it waits that the data nodes
+     * are up-to-date for the passed topology version.
+     *
+     * <p>If the values of auto adjust scale up and auto adjust scale down are 
zero, then on the cluster topology changes
+     * the data nodes for the zone should be updated immediately. Therefore, 
this method must return the data nodes which is calculated
+     * based on the topology with passed or greater version. Since the date 
nodes value is updated asynchronously, this method waits for
+     * the date nodes to be updated with new nodes in the topology if the 
value of auto adjust scale up is 0, and also waits for
+     * the date nodes to be updated with nodes that have left the topology if 
the value of auto adjust scale down is 0.
+     * After the zone manager has observed the logical topology change and the 
data nodes value is updated according to cluster topology,
+     * then this method completes the returned future with the current value 
of data nodes.
+     *
+     * <p>If the value of auto adjust scale up is greater than zero, then it 
is not necessary to wait for the data nodes update triggered
+     * by new nodes in cluster topology. Similarly if the value of auto adjust 
scale down is greater than zero, then it is not necessary to
+     * wait for the data nodes update triggered by new nodes that have left 
the topology in cluster topology.
+     *
+     * <p>The returned future can be completed with {@link 
DistributionZoneNotFoundException} if zone with the provided {@code zoneId}
+     * cannot be found or {@link DistributionZoneWasRemovedException} in case 
when the distribution zone was removed during
+     * method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future which will be completed with data nodes 
for the zoneId or with exception.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int 
zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = 
awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> 
scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> 
scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for observing passed topology version or greater version in 
{@link DistributionZoneManager#topologyWatchListener}.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to 
boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means 
that data nodes changing must be started immediately.
+     *
+     * <p>The returned future can be completed with {@link 
DistributionZoneNotFoundException}
+     * in case when the distribution zone was removed.
+     *
+     * @param zoneId Zone id.
+     * @return Future with the boolean values for immediate auto adjust scale 
up and immediate auto adjust scale down.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Boolean>> 
getImmediateTimers(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            DistributionZoneConfiguration zoneCfg = 
getZoneById(zonesConfiguration, zoneId);
+
+            return completedFuture(new IgniteBiTuple<>(
+                    zoneCfg.dataNodesAutoAdjustScaleUp().value() == 
IMMEDIATE_TIMER_VALUE,
+                    zoneCfg.dataNodesAutoAdjustScaleDown().value() == 
IMMEDIATE_TIMER_VALUE
+            ));
+        });
+    }
+
+    /**
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 
then waits for the zone manager processes
+     * the data nodes update triggered by started nodes with passed or greater 
revision.
+     * Else does nothing.
+     *
+     * <p>The returned future can be completed with {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param immediateScaleUp True in case of immediate scale up.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> scaleUpAwaiting(int zoneId, boolean 
immediateScaleUp) {
+        return inBusyLock(busyLock, () -> {
+            if (immediateScaleUp) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState != null) {
+                    return 
zoneState.scaleUpRevisionTracker().waitFor(lastScaleUpRevision);
+                } else {
+                    return failedFuture(new 
DistributionZoneWasRemovedException(zoneId));
+                }
+            } else {
+                return completedFuture(null);
+            }
+        });
+    }
+
+    /**
+     * If the {@link 
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 
then waits for the zone manager processes
+     * the data nodes update triggered by stopped nodes with passed or greater 
revision.
+     * Else does nothing.
+     *
+     * <p>The returned future can be completed with {@link 
DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param immediateScaleDown True in case of immediate scale down.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> scaleDownAwaiting(int zoneId, boolean 
immediateScaleDown) {
+        return inBusyLock(busyLock, () -> {
+            if (immediateScaleDown) {
+                ZoneState zoneState = zonesState.get(zoneId);
+
+                if (zoneState != null) {
+                    return 
zoneState.scaleDownRevisionTracker().waitFor(lastScaleDownRevision);
+                } else {
+                    return failedFuture(new 
DistributionZoneWasRemovedException(zoneId));
+                }
+            } else {
+                return completedFuture(null);
+            }
+        });
+    }
+
+    /**
+     * Returns the future with data nodes of the specified zone.
+     *
+     * @param zoneId Zone id.
+     * @return Future.
+     */
+    private CompletableFuture<Set<String>> getDataNodesFuture(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (zoneState != null) {
+                return completedFuture(zonesState.get(zoneId).nodes());
+            } else {
+                return failedFuture(new 
DistributionZoneWasRemovedException(zoneId));
+            }
+        });
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() {
@@ -540,7 +716,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             logicalTopologyService.addEventListener(topologyEventListener);
 
-            metaStorageManager.registerExactWatch(zonesLogicalTopologyKey(), 
watchListener);
+            
metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), 
topologyWatchListener);
+            metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), 
dataNodesWatchListener);
 
             initDataNodesFromVaultManager();
 
@@ -584,6 +761,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 zoneState.stopScaleUp();
             }
 
+            //Only wait for a scale up revision if the auto adjust scale up 
has a zero value.
+            //So if the value of the auto adjust scale up has become non-zero, 
then need to complete all futures.
+            if (newScaleUp > 0) {
+                zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+            }
+
             return completedFuture(null);
         };
     }
@@ -622,6 +805,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 zoneState.stopScaleDown();
             }
 
+            //Only wait for a scale down revision if the auto adjust scale 
down has a zero value.
+            //So if the value of the auto adjust scale down has become 
non-zero, then need to complete all futures.
+            if (newScaleDown > 0) {
+                
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+            }
+
             return completedFuture(null);
         };
     }
@@ -637,7 +826,16 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         logicalTopologyService.removeEventListener(topologyEventListener);
 
-        metaStorageManager.unregisterWatch(watchListener);
+        metaStorageManager.unregisterWatch(topologyWatchListener);
+        metaStorageManager.unregisterWatch(dataNodesWatchListener);
+
+        //Need to update trackers with max possible value to complete all 
futures that are waiting for trackers.
+        topVerTracker.update(Long.MAX_VALUE);
+
+        zonesState.values().forEach(zoneState -> {
+            zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE);
+            zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE);
+        });
 
         shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
     }
@@ -664,7 +862,10 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             removeTriggerKeysAndDataNodes(zoneId, ctx.storageRevision());
 
-            zonesState.remove(zoneId);
+            ZoneState zoneState = zonesState.remove(zoneId);
+
+            zoneState.scaleUpRevisionTracker.update(Long.MAX_VALUE);
+            zoneState.scaleDownRevisionTracker.update(Long.MAX_VALUE);
 
             return completedFuture(null);
         }
@@ -904,45 +1105,62 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         try {
-            vaultMgr.get(zonesLogicalTopologyKey())
-                    .thenAccept(vaultEntry -> {
-                        if (!busyLock.enterBusy()) {
-                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
-                        }
+            long appliedRevision = metaStorageManager.appliedRevision();
 
-                        try {
-                            long appliedRevision = 
metaStorageManager.appliedRevision();
+            lastScaleUpRevision = appliedRevision;
 
-                            if (vaultEntry != null && vaultEntry.value() != 
null) {
-                                logicalTopology = 
fromBytes(vaultEntry.value());
+            lastScaleDownRevision = appliedRevision;
 
-                                // init keys and data nodes for default zone
-                                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                        DEFAULT_ZONE_ID,
-                                        appliedRevision,
-                                        logicalTopology
-                                );
+            VaultEntry topVerEntry = 
vaultMgr.get(zonesLogicalTopologyVersionKey()).join();
 
-                                
zonesConfiguration.distributionZones().value().forEach(zone -> {
-                                    int zoneId = zone.zoneId();
+            if (topVerEntry != null && topVerEntry.value() != null) {
+                topVerTracker.update(bytesToLong(topVerEntry.value()));
+            }
 
-                                    
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                            zoneId,
-                                            appliedRevision,
-                                            logicalTopology
-                                    );
-                                });
-                            }
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    });
+            VaultEntry topologyEntry = 
vaultMgr.get(zonesLogicalTopologyKey()).join();
+
+            if (topologyEntry != null && topologyEntry.value() != null) {
+                logicalTopology = fromBytes(topologyEntry.value());
+
+                // init keys and data nodes for default zone
+                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                        DEFAULT_ZONE_ID,
+                        appliedRevision,
+                        logicalTopology
+                );
+
+                zonesConfiguration.distributionZones().value().forEach(zone -> 
{
+                    int zoneId = zone.zoneId();
+
+                    saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                            zoneId,
+                            appliedRevision,
+                            logicalTopology
+                    );
+                });
+            }
+
+            zonesState.values().forEach(zoneState -> {
+                zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+
+                
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+
+                zoneState.nodes(logicalTopology);
+            });
+
+            assert topologyEntry == null || topologyEntry.value() == null || 
logicalTopology.equals(fromBytes(topologyEntry.value()))
+                    : "Initial value of logical topology was changed after 
initialization from the vault manager.";
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private WatchListener createMetastorageListener() {
+    /**
+     * Creates watch listener which listens logical topology and logical 
topology version.
+     *
+     * @return Watch listener.
+     */
+    private WatchListener createMetastorageTopologyListener() {
         return new WatchListener() {
             @Override
             public CompletableFuture<Void> onUpdate(WatchEvent evt) {
@@ -951,24 +1169,55 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry 
but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and 
logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> 
entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
+
+                        if (Arrays.equals(e.key(), 
zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    long revision = newEntry.revision();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), 
zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null : "The event doesn't 
contain logical topology";
+                    assert revision > 0 : "The event doesn't contain logical 
topology version";
 
-                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
+                    //Firstly update lastScaleUpRevision and 
lastScaleDownRevision then update topVerTracker to ensure thread-safety.
+                    if (!addedNodes.isEmpty()) {
+                        lastScaleUpRevision = revision;
+                    }
+
+                    if (!removedNodes.isEmpty()) {
+                        lastScaleDownRevision = revision;
+                    }
+
+                    topVerTracker.update(topVer);
+
                     logicalTopology = newLogicalTopology;
 
                     NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
@@ -997,6 +1246,86 @@ public class DistributionZoneManager implements 
IgniteComponent {
         };
     }
 
+    /**
+     * Creates watch listener which listens data nodes, scale up revision and 
scale down revision.
+     *
+     * @return Watch listener.
+     */
+    private WatchListener createMetastorageDataNodesListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    return failedFuture(new NodeStoppingException());
+                }
+
+                try {
+                    int zoneId = 0;
+
+                    Set<String> newDataNodes = null;
+
+                    long scaleUpRevision = 0;
+
+                    long scaleDownRevision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
+
+                        if (startsWith(e.key(), zoneDataNodesKey().bytes())) {
+                            zoneId = extractZoneId(e.key());
+
+                            byte[] dataNodesBytes = e.value();
+
+                            if (dataNodesBytes != null) {
+                                newDataNodes = 
DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
+                            } else {
+                                newDataNodes = emptySet();
+                            }
+                        } else if (startsWith(e.key(), 
zoneScaleUpChangeTriggerKey().bytes())) {
+                            if (e.value() != null) {
+                                scaleUpRevision = bytesToLong(e.value());
+                            }
+                        } else if (startsWith(e.key(), 
zoneScaleDownChangeTriggerKey().bytes())) {
+                            if (e.value() != null) {
+                                scaleDownRevision = bytesToLong(e.value());
+                            }
+                        }
+                    }
+
+                    ZoneState zoneState = zonesState.get(zoneId);
+
+                    if (zoneState == null) {
+                        //The zone has been dropped so no need to update 
zoneState.
+                        return completedFuture(null);
+                    }
+
+                    assert newDataNodes != null : "Data nodes was not 
initialized.";
+
+                    zoneState.nodes(newDataNodes);
+
+                    //Associates scale up meta storage revision and data nodes.
+                    if (scaleUpRevision > 0) {
+                        
zoneState.scaleUpRevisionTracker.update(scaleUpRevision);
+                    }
+
+                    //Associates scale down meta storage revision and data 
nodes.
+                    if (scaleDownRevision > 0) {
+                        
zoneState.scaleDownRevisionTracker.update(scaleDownRevision);
+                    }
+                } finally {
+                    busyLock.leaveBusy();
+                }
+
+                return completedFuture(null);
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.warn("Unable to process data nodes event", e);
+            }
+        };
+    }
+
     /**
      * Schedules scale up and scale down timers.
      *
@@ -1292,6 +1621,15 @@ public class DistributionZoneManager implements 
IgniteComponent {
         /** Executor for scheduling tasks for scale up and scale down 
processes. */
         private final ScheduledExecutorService executor;
 
+        /** Data nodes. */
+        private volatile Set<String> nodes;
+
+        /** The tracker for scale up meta storage revision of current data 
nodes value. */
+        private final PendingComparableValuesTracker<Long> 
scaleUpRevisionTracker;
+
+        /** The tracker for scale down meta storage revision of current data 
nodes value. */
+        private final PendingComparableValuesTracker<Long> 
scaleDownRevisionTracker;
+
         /**
          * Constructor.
          *
@@ -1300,6 +1638,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
         ZoneState(ScheduledExecutorService executor) {
             this.executor = executor;
             topologyAugmentationMap = new ConcurrentSkipListMap<>();
+            nodes = emptySet();
+            scaleUpRevisionTracker = new PendingComparableValuesTracker<>(0L);
+            scaleDownRevisionTracker = new 
PendingComparableValuesTracker<>(0L);
         }
 
         /**
@@ -1456,6 +1797,42 @@ public class DistributionZoneManager implements 
IgniteComponent {
                     .map(Map.Entry::getKey);
         }
 
+        /**
+         * Get data nodes.
+         *
+         * @return Data nodes.
+         */
+        private Set<String> nodes() {
+            return nodes;
+        }
+
+        /**
+         * Set data nodes.
+         *
+         * @param nodes Data nodes.
+         */
+        private void nodes(Set<String> nodes) {
+            this.nodes = nodes;
+        }
+
+        /**
+         * The tracker for scale up meta storage revision of current data 
nodes value.
+         *
+         * @return The tracker.
+         */
+        private PendingComparableValuesTracker<Long> scaleUpRevisionTracker() {
+            return scaleUpRevisionTracker;
+        }
+
+        /**
+         * The tracker for scale down meta storage revision of current data 
nodes value.
+         *
+         * @return The tracker.
+         */
+        private PendingComparableValuesTracker<Long> 
scaleDownRevisionTracker() {
+            return scaleDownRevisionTracker;
+        }
+
         @TestOnly
         synchronized ScheduledFuture<?> scaleUpTask() {
             return scaleUpTask;
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 089f9c29a6..ef1a11a7d4 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -54,20 +54,28 @@ import org.jetbrains.annotations.Nullable;
  * Util class for Distribution Zones flow.
  */
 public class DistributionZonesUtil {
-    /** Key prefix for zone's data nodes. */
+    /** Key prefix for zone's data nodes and trigger keys. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zone's data nodes. */
+    private static final String DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX = 
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "value.";
+
     /** Key prefix for zone's scale up change trigger key. */
-    private static final String 
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX = 
"distributionZone.scaleUp.change.trigger.";
+    private static final String 
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX =
+            DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleUpChangeTrigger.";
 
     /** Key prefix for zone's scale down change trigger key. */
-    private static final String 
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX = 
"distributionZone.scaleDown.change.trigger.";
+    private static final String 
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX =
+            DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleDownChangeTrigger.";
+
+    /** Key prefix for zones' logical topology nodes and logical topology 
version. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX = 
"distributionZones.logicalTopology.";
 
     /** Key prefix for zones' logical topology nodes. */
-    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
"distributionZones.logicalTopology";
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
 
     /** Key prefix for zones' logical topology version. */
-    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION = 
"distributionZones.logicalTopologyVersion";
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION = 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "version";
 
     /** Key prefix, needed for processing the event about zone's update was 
triggered only once. */
     private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX = 
"distributionZones.change.trigger.";
@@ -85,23 +93,36 @@ public class DistributionZonesUtil {
      */
     private static final long INITIAL_TRIGGER_REVISION_VALUE = 0;
 
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
+    private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
+            new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+
     /**
-     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
      *
      * @param zoneId Zone id.
      * @return ByteArray representation.
      */
     public static ByteArray zoneDataNodesKey(int zoneId) {
-        return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
+        return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX + 
zoneId);
+    }
+
+    /**
+     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
+     *
+     * @return ByteArray representation.
+     */
+    public static ByteArray zoneDataNodesKey() {
+        return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
     }
 
     /**
-     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+     * ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX}.
      *
      * @return ByteArray representation.
      */
-    public static ByteArray zoneDataNodesPrefix() {
-        return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+    public static ByteArray zoneLogicalTopologyPrefix() {
+        return new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX);
     }
 
     /**
@@ -113,7 +134,7 @@ public class DistributionZonesUtil {
     public static int extractZoneId(byte[] key) {
         var strKey = new String(key, StandardCharsets.UTF_8);
 
-        return 
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_PREFIX.length()));
+        return 
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.length()));
     }
 
     /**
@@ -132,6 +153,13 @@ public class DistributionZonesUtil {
         return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX 
+ zoneId);
     }
 
+    /**
+     * The key prefix needed for processing an event about zone's data node 
propagation on scale up.
+     */
+    public static ByteArray zoneScaleUpChangeTriggerKey() {
+        return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX);
+    }
+
     /**
      * The key needed for processing an event about zone's data node 
propagation on scale down.
      * With this key we can be sure that event was triggered only once.
@@ -140,6 +168,13 @@ public class DistributionZonesUtil {
         return new 
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX + zoneId);
     }
 
+    /**
+     * The key prefix needed for processing an event about zone's data node 
propagation on scale down.
+     */
+    public static ByteArray zoneScaleDownChangeTriggerKey() {
+        return new 
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX);
+    }
+
     /**
      * The key that represents logical topology nodes, needed for distribution 
zones. It is needed to store them in the metastore
      * to serialize data nodes changes triggered by topology changes and 
changes of distribution zones configurations.
@@ -156,6 +191,13 @@ public class DistributionZonesUtil {
         return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
     }
 
+    /**
+     * The key prefix needed for processing an event about zone's data nodes.
+     */
+    static ByteArray zonesDataNodesPrefix() {
+        return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+    }
+
     /**
      * Condition for updating {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
      * Update only if the revision of the event is newer than value in that 
trigger key.
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
new file mode 100644
index 0000000000..5bb0063a37
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.exception;
+
+import static 
org.apache.ignite.lang.ErrorGroups.DistributionZones.ZONE_NOT_FOUND_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Exception is thrown when appropriate distribution zone was removed.
+ */
+public class DistributionZoneWasRemovedException extends 
IgniteInternalException {
+    /**
+     * The constructor.
+     *
+     * @param zoneId Zone id.
+     */
+    public DistributionZoneWasRemovedException(int zoneId) {
+        super(ZONE_NOT_FOUND_ERR, "Distribution zone is not found [zoneId=" + 
zoneId + ']', null);
+    }
+
+    /**
+     * The constructor is used for creating an exception instance that is 
thrown from a remote server.
+     *
+     * @param traceId Trace id.
+     * @param code Error code.
+     * @param message Error message.
+     * @param cause Cause exception.
+     */
+    public DistributionZoneWasRemovedException(UUID traceId, int code, String 
message, Throwable cause) {
+        super(traceId, code, message, cause);
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
new file mode 100644
index 0000000000..1ad692df73
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case 
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} 
are immediate.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultManager;
+
+    @InjectConfiguration
+    private TablesConfiguration tablesConfiguration;
+
+    @InjectConfiguration
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private final List<IgniteComponent> components = new ArrayList<>();
+
+    @BeforeEach
+    void setUp() throws Exception {
+        vaultManager = new VaultManager(new InMemoryVaultService());
+
+        assertThat(vaultManager.put(zonesLogicalTopologyKey(), null), 
willCompleteSuccessfully());
+        assertThat(vaultManager.put(zonesLogicalTopologyVersionKey(), 
longToBytes(0)), willCompleteSuccessfully());
+
+        components.add(vaultManager);
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+        metaStorageManager = StandaloneMetaStorageManager.create(vaultManager, 
keyValueStorage);
+
+        components.add(metaStorageManager);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        components.add(clusterStateStorage);
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+                vaultManager,
+                "test"
+        );
+
+        mockCmgLocalNodes();
+
+        // Not adding 'distributionZoneManager' on purpose, it's started 
manually.
+        components.forEach(IgniteComponent::start);
+
+        metaStorageManager.deployWatches();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        components.add(distributionZoneManager);
+
+        Collections.reverse(components);
+
+        IgniteUtils.closeAll(components.stream().map(c -> c::beforeNodeStop));
+        IgniteUtils.closeAll(components.stream().map(c -> c::stop));
+    }
+
+    /**
+     * This test invokes {@link 
DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and 
non-default zone id
+     * and different logical topology versions.
+     * Simulates new logical topology with new nodes and with removed nodes. 
Check that data nodes futures are completed in right order.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19288";)
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws 
Exception {
+        startZoneManager(0);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone0")
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        int zoneId0 = distributionZoneManager.getZoneId("zone0");
+        int zoneId1 = distributionZoneManager.getZoneId("zone1");
+
+        LOG.info("Topology with added nodes.");
+
+        CompletableFuture<Set<String>> dataNodesUpFut0 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut1 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut2 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
+        CompletableFuture<Set<String>> dataNodesUpFut3 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 11);
+        CompletableFuture<Set<String>> dataNodesUpFut4 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut5 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 2);
+        CompletableFuture<Set<String>> dataNodesUpFut6 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
+        CompletableFuture<Set<String>> dataNodesUpFut7 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 2);
+
+        int topVer0 = 2;
+
+        Set<String> threeNodes = Set.of("node0", "node1", "node2");
+
+        setLogicalTopologyInMetaStorage(threeNodes, topVer0);
+
+        assertEquals(threeNodes, dataNodesUpFut0.get(3, SECONDS));
+        assertEquals(threeNodes, dataNodesUpFut1.get(3, SECONDS));
+        assertEquals(threeNodes, dataNodesUpFut2.get(3, SECONDS));
+
+        assertEquals(threeNodes, dataNodesUpFut4.get(3, SECONDS));
+        assertEquals(threeNodes, dataNodesUpFut5.get(3, SECONDS));
+        assertEquals(threeNodes, dataNodesUpFut6.get(3, SECONDS));
+        assertEquals(threeNodes, dataNodesUpFut7.get(3, SECONDS));
+        assertFalse(dataNodesUpFut3.isDone());
+
+
+        LOG.info("Topology with removed nodes.");
+
+        CompletableFuture<Set<String>> dataNodesDownFut0 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut1 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut2 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 5);
+        CompletableFuture<Set<String>> dataNodesDownFut3 = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 6);
+        CompletableFuture<Set<String>> dataNodesDownFut4 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut5 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 5);
+        CompletableFuture<Set<String>> dataNodesDownFut6 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 4);
+        CompletableFuture<Set<String>> dataNodesDownFut7 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 5);
+
+        int topVer1 = 5;
+
+        Set<String> twoNodes = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(twoNodes, topVer1);
+
+        assertEquals(twoNodes, dataNodesDownFut0.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut1.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut2.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut4.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut5.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut6.get(3, SECONDS));
+        assertEquals(twoNodes, dataNodesDownFut7.get(3, SECONDS));
+        assertFalse(dataNodesDownFut3.isDone());
+
+        int topVer2 = 20;
+
+        LOG.info("Topology with added and removed nodes.");
+
+        Set<String> dataNodes = Set.of("node0", "node2");
+
+        setLogicalTopologyInMetaStorage(dataNodes, topVer2);
+
+        assertEquals(dataNodes, dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
added nodes.
+     */
+    @Test
+    void testScaleUpAndThenScaleDown() throws Exception {
+        startZoneManager(0);
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 5);
+
+        assertFalse(dataNodesFut.isDone());
+
+        long topVer = 100;
+
+        Set<String> dataNodes0 = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(dataNodes0, topVer);
+
+        assertFalse(dataNodesFut.isDone());
+
+        assertEquals(dataNodes0, dataNodesFut.get(3, SECONDS));
+
+        dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 106);
+
+        Set<String> dataNodes1 = Set.of("node0");
+
+        setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100);
+
+        assertFalse(dataNodesFut.isDone());
+
+        assertEquals(dataNodes1, dataNodesFut.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
added and removed nodes for the zone with
+     * dataNodesAutoAdjustScaleUp is immediate and 
dataNodesAutoAdjustScaleDown is non-zero.
+     */
+    @Test
+    void testAwaitingScaleUpOnly() throws Exception {
+        startZoneManager(0);
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+                .get(3, SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        int zoneId = distributionZoneManager.getZoneId("zone1");
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(nodes0, 1);
+
+        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+
+        dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(zoneId, 2);
+
+        assertFalse(dataNodesFut.isDone());
+
+        setLogicalTopologyInMetaStorage(Set.of("node0"), 2);
+
+        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with 
added and removed nodes for the zone with
+     * dataNodesAutoAdjustScaleUp is non-zero and dataNodesAutoAdjustScaleDown 
is immediate. And checks that other zones
+     * non-zero timers doesn't affect.
+     */
+    @Test
+    void testAwaitingScaleDownOnly() throws Exception {
+        startZoneManager(0);
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+                .get(3, SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone0")
+                                
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone2")
+                                
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        int zoneId0 = distributionZoneManager.getZoneId("zone0");
+        int zoneId1 = distributionZoneManager.getZoneId("zone1");
+        int zoneId2 = distributionZoneManager.getZoneId("zone2");
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(nodes0, 1);
+
+        dataNodesFut.get(3, SECONDS);
+
+        CompletableFuture<Set<String>> dataNodesFut1Zone0 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 2);
+        CompletableFuture<Set<String>> dataNodesFut1 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 2);
+        CompletableFuture<Set<String>> dataNodesFut1Zone2 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId2, 2);
+
+        assertFalse(dataNodesFut1Zone0.isDone());
+        assertFalse(dataNodesFut1.isDone());
+        assertFalse(dataNodesFut1Zone2.isDone());
+
+        Set<String> nodes1 = Set.of("node0");
+
+        distributionZoneManager.alterZone("zone1", new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                        
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
+                .get(3, SECONDS);
+
+        System.out.println("setLogicalTopologyInMetaStorage_2");
+        setLogicalTopologyInMetaStorage(nodes1, 2);
+
+        assertEquals(nodes1, dataNodesFut1.get(3, SECONDS));
+
+        CompletableFuture<Set<String>> dataNodesFut2 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 3);
+
+        Set<String> nodes2 = Set.of("node0", "node1");
+
+        assertFalse(dataNodesFut2.isDone());
+
+        setLogicalTopologyInMetaStorage(nodes2, 3);
+
+        assertEquals(nodes1, dataNodesFut2.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed immediately for the 
zone with
+     * dataNodesAutoAdjustScaleUp is non-zero and dataNodesAutoAdjustScaleDown 
is non-zero.
+     */
+    @Test
+    void testWithOutAwaiting() throws Exception {
+        startZoneManager(0);
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+                .get(3, SECONDS);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone1")
+                                
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        int zoneId = distributionZoneManager.getZoneId("zone1");
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
+
+        assertFalse(dataNodesFut.isDone());
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(nodes0, 1);
+
+        assertEquals(emptySet(), dataNodesFut.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed exceptionally if the 
zone was removed while
+     * data nodes awaiting.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19255";)
+    @Test
+    void testRemoveZoneWhileAwaitingDataNodes() throws Exception {
+        startZoneManager(0);
+
+        distributionZoneManager.createZone(
+                        new 
DistributionZoneConfigurationParameters.Builder("zone0")
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                                .build()
+                )
+                .get(3, SECONDS);
+
+        int zoneId = distributionZoneManager.getZoneId("zone0");
+
+        CompletableFuture<Set<String>> dataNodesFut0 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId, 5);
+
+        setLogicalTopologyInMetaStorage(Set.of("node0", "node1"), 100);
+
+        assertFalse(dataNodesFut0.isDone());
+
+        assertEquals(Set.of("node0", "node1"), dataNodesFut0.get(3, SECONDS));
+
+        CompletableFuture<Set<String>> dataNodesFut1 = 
distributionZoneManager.topologyVersionedDataNodes(zoneId, 106);
+
+        setLogicalTopologyInMetaStorage(Set.of("node0", "node2"), 200);
+
+        assertFalse(dataNodesFut1.isDone());
+
+        distributionZoneManager.dropZone("zone0").get();
+
+        assertThrowsWithCause(() -> dataNodesFut1.get(3, SECONDS), 
DistributionZoneWasRemovedException.class);
+    }
+
+    /**
+     * Test checks that data nodes futures are completed with old data nodes 
if dataNodesAutoAdjustScaleUp
+     * and dataNodesAutoAdjustScaleDown timer increased to non-zero value.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19255";)
+    @Test
+    void testScaleUpScaleDownAreChangedWhileAwaitingDataNodes() throws 
Exception {
+        startZoneManager(0);
+
+        Set<String> nodes0 = Set.of("node0", "node1");
+
+        setLogicalTopologyInMetaStorage(nodes0, 1);
+
+        CompletableFuture<Set<String>> dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+
+        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+
+        Set<String> nodes1 = Set.of("node0", "node2");
+
+        dataNodesFut = 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
+
+        setLogicalTopologyInMetaStorage(nodes1, 2);
+
+        assertFalse(dataNodesFut.isDone());
+
+        //need to create new zone to fix assert invariant which is broken in 
this test environment.
+        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder("zone0")
+                        
.dataNodesAutoAdjustScaleUp(1000).dataNodesAutoAdjustScaleDown(1000).build())
+                .get(3, SECONDS);
+
+        assertFalse(dataNodesFut.isDone());
+
+        distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        
.dataNodesAutoAdjustScaleUp(1000).dataNodesAutoAdjustScaleDown(1000).build())
+                .get(3, SECONDS);
+
+        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes are initialized on zone manager start.
+     */
+    @Test
+    void testInitializedDataNodesOnZoneManagerStart() throws Exception {
+        Set<String> dataNodes = Set.of("node0", "node1");
+
+        Map<ByteArray, byte[]> valEntries = new HashMap<>();
+
+        valEntries.put(zonesLogicalTopologyKey(), toBytes(dataNodes));
+        valEntries.put(zonesLogicalTopologyVersionKey(), longToBytes(3));
+
+        vaultManager.putAll(valEntries);
+
+        Collection<LogicalNode> nodes = new ArrayList<>();
+
+        nodes.add(new LogicalNode(new ClusterNode("node0", "node0", new 
NetworkAddress("local", 1))));
+        nodes.add(new LogicalNode(new ClusterNode("node1", "node1", new 
NetworkAddress("local", 1))));
+
+        when(cmgManager.logicalTopology()).thenReturn(completedFuture(new 
LogicalTopologySnapshot(3, nodes)));
+
+        startZoneManager(10);
+
+        assertEquals(dataNodes, 
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
+                .get(3, SECONDS));
+    }
+
+    private void startZoneManager(long revision) throws Exception {
+        vaultManager.put(new ByteArray("applied_revision"), 
longToBytes(revision)).get();
+
+        distributionZoneManager.start();
+
+        distributionZoneManager.alterZone(
+                        DEFAULT_ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                                
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                                
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
+                .get(3, SECONDS);
+    }
+
+    private void setLogicalTopologyInMetaStorage(Set<String> nodes, long 
topVer) {
+        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
+                Conditions.exists(zonesLogicalTopologyKey()),
+                List.of(
+                        Operations.put(zonesLogicalTopologyKey(), 
toBytes(nodes)),
+                        Operations.put(zonesLogicalTopologyVersionKey(), 
longToBytes(topVer))
+                ),
+                List.of(Operations.noop())
+        );
+
+        assertThat(invokeFuture, willBe(true));
+    }
+
+    private void mockCmgLocalNodes() {
+        
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopology.getLogicalTopology()));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 8251652d32..de4f6f8f62 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
@@ -35,8 +36,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.List;
@@ -108,13 +107,19 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
         when(vaultMgr.get(zonesLogicalTopologyKey()))
                 .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), toBytes(nodes))));
 
+        when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
         LogicalTopologyService logicalTopologyService = 
mock(LogicalTopologyService.class);
-        
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
 LogicalTopologySnapshot(1, Set.of())));
+
+        
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
 LogicalTopologySnapshot(0, Set.of())));
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
 
         metaStorageManager = StandaloneMetaStorageManager.create(vaultMgr, 
keyValueStorage);
 
+        metaStorageManager.put(zonesLogicalTopologyVersionKey(), 
longToBytes(0));
+
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 tablesConfiguration,
@@ -187,8 +192,6 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
         assertZonesChangeTriggerKey(100, 1, keyValueStorage);
 
         assertDataNodesForZone(1, null, keyValueStorage);
@@ -204,8 +207,6 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         distributionZoneManager.dropZone(ZONE_NAME).get();
 
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
-
         assertDataNodesForZone(1, nodes, keyValueStorage);
     }
 }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index fb71dd9769..88db11eb49 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -20,18 +20,18 @@ package org.apache.ignite.internal.distributionzones;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
 import static 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -140,6 +141,12 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         when(value.namedListKeys()).thenReturn(new ArrayList<>());
 
+        when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), toBytes(Set.of()))));
+
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 tablesConfiguration,
@@ -293,8 +300,6 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         distributionZoneManager1.start();
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
         assertLogicalTopVer(1L);
 
         assertLogicalTopology(clusterNodes, keyValueStorage);
@@ -312,8 +317,6 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         distributionZoneManager1.start();
 
-        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
         assertLogicalTopVer(2L);
 
         assertLogicalTopology(clusterNodes, keyValueStorage);
@@ -331,8 +334,6 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         distributionZoneManager1.start();
 
-        verify(keyValueStorage, after(500).never()).invoke(any());
-
         assertLogicalTopVer(2L);
 
         assertLogicalTopology(null, keyValueStorage);
@@ -350,8 +351,6 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         distributionZoneManager1.start();
 
-        verify(keyValueStorage, after(500).never()).invoke(any());
-
         assertLogicalTopVer(3L);
 
         assertLogicalTopology(null, keyValueStorage);
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 241654ec71..eeb6dad3a0 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -23,9 +23,11 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManag
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
@@ -47,6 +49,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -91,6 +94,7 @@ import 
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
@@ -126,7 +130,7 @@ public class DistributionZoneManagerScaleUpTest {
 
     private MetaStorageManager metaStorageManager;
 
-    private WatchListener watchListener;
+    private WatchListener topologyWatchListener;
 
     private DistributionZonesConfiguration zonesConfiguration;
 
@@ -187,13 +191,23 @@ public class DistributionZoneManagerScaleUpTest {
         mockVaultAppliedRevision(1);
 
         
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
         when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
 
         doAnswer(invocation -> {
-            watchListener = invocation.getArgument(1);
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            }
 
             return null;
-        }).when(metaStorageManager).registerExactWatch(any(), any());
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
 
         AtomicLong raftIndex = new AtomicLong();
 
@@ -1577,15 +1591,18 @@ public class DistributionZoneManagerScaleUpTest {
     }
 
     private void watchListenerOnUpdate(Set<String> nodes, long rev) {
-        byte[] newLogicalTopology = toBytes(nodes);
+        byte[] newTopology = toBytes(nodes);
+        byte[] newTopVer = longToBytes(1L);
 
-        Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(), 
newLogicalTopology, rev, 1);
+        Entry topology = new EntryImpl(zonesLogicalTopologyKey().bytes(), 
newTopology, rev, 1);
+        Entry topVer = new EntryImpl(zonesLogicalTopologyVersionKey().bytes(), 
newTopVer, rev, 1);
 
-        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+        EntryEvent topologyEvent = new EntryEvent(null, topology);
+        EntryEvent topVerEvent = new EntryEvent(null, topVer);
 
-        WatchEvent evt = new WatchEvent(entryEvent);
+        WatchEvent evt = new WatchEvent(List.of(topologyEvent, topVerEvent), 
rev);
 
-        watchListener.onUpdate(evt);
+        topologyWatchListener.onUpdate(evt);
     }
 
     private void mockCmgLocalNodes() {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index 72b264cb44..07bd6127dc 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -22,9 +22,11 @@ import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTR
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
 import static 
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.mockMetaStorageListener;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -39,6 +41,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -71,6 +74,7 @@ import 
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -94,7 +98,7 @@ public class DistributionZoneManagerWatchListenerTest extends 
IgniteAbstractTest
 
     private MetaStorageManager metaStorageManager;
 
-    private WatchListener watchListener;
+    private WatchListener topologyWatchListener;
 
     private DistributionZonesConfiguration zonesConfiguration;
 
@@ -153,13 +157,23 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
         mockVaultAppliedRevision(1);
 
         
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
         when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
 
         doAnswer(invocation -> {
-            watchListener = invocation.getArgument(1);
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), 
zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            }
 
             return null;
-        }).when(metaStorageManager).registerExactWatch(any(), any());
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
 
         AtomicLong raftIndex = new AtomicLong();
 
@@ -193,7 +207,9 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
         distributionZoneManager.alterZone(
                 DEFAULT_ZONE_NAME,
-                new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+                new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(0)
+                        .build()
         ).get();
 
         //first event
@@ -322,15 +338,18 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
     }
 
     private void watchListenerOnUpdate(Set<String> nodes, long rev) {
-        byte[] newLogicalTopology = toBytes(nodes);
+        byte[] newTopology = toBytes(nodes);
+        byte[] newTopVer = longToBytes(1L);
 
-        Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(), 
newLogicalTopology, rev, 1);
+        Entry topology = new EntryImpl(zonesLogicalTopologyKey().bytes(), 
newTopology, rev, 1);
+        Entry topVer = new EntryImpl(zonesLogicalTopologyVersionKey().bytes(), 
newTopVer, rev, 1);
 
-        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+        EntryEvent topologyEvent = new EntryEvent(null, topology);
+        EntryEvent topVerEvent = new EntryEvent(null, topVer);
 
-        WatchEvent evt = new WatchEvent(entryEvent);
+        WatchEvent evt = new WatchEvent(List.of(topologyEvent, topVerEvent), 
rev);
 
-        watchListener.onUpdate(evt);
+        topologyWatchListener.onUpdate(evt);
     }
 
     private void mockVaultAppliedRevision(long revision) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 5dc30b2fbb..21b4f669bc 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -189,8 +189,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
 
             tx.commit();
         } finally {
-            //TODO: IGNITE-18324 Nothing do in the rollback invocation when a 
transaction is committed.
-            //tx.rollback();
+            tx.rollback();
         }
 
         stopNodes();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e7cef89955..5ada992a94 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,7 +27,7 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -432,7 +432,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         
distributionZonesConfiguration.distributionZones().any().replicas().listen(this::onUpdateReplicas);
 
         // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
-        metaStorageMgr.registerPrefixWatch(zoneDataNodesPrefix(), 
distributionZonesDataNodesListener);
+        metaStorageMgr.registerExactWatch(zoneDataNodesKey(), 
distributionZonesDataNodesListener);
 
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index bd95f013a7..4c0eb192d5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -144,14 +145,16 @@ public class TableManagerDistributionZonesTest extends 
IgniteAbstractTest {
         MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
 
         doAnswer(invocation -> {
-            WatchListener listener = invocation.getArgument(1);
+            ByteArray key = invocation.getArgument(0);
 
-            if (watchListener == null) {
-                watchListener = listener;
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), zoneDataNodesKey().bytes())) {
+                this.watchListener = watchListener;
             }
 
             return null;
-        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+        }).when(metaStorageManager).registerExactWatch(any(), any());
 
         tablesConfiguration = mock(TablesConfiguration.class);
 


Reply via email to