This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b882c50079 IGNITE-18115 DistributionZoneManager populated with 
MetaStorage listeners to logical topology events. (#1426)
b882c50079 is described below

commit b882c5007915ec31f770837ead108fc33bcf8424
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Jan 3 11:21:33 2023 +0300

    IGNITE-18115 DistributionZoneManager populated with MetaStorage listeners 
to logical topology events. (#1426)
---
 modules/distribution-zones/build.gradle            |   1 +
 .../distributionzones/DistributionZoneManager.java | 307 +++++++++++---
 .../distributionzones/DistributionZonesUtil.java   |   4 +-
 .../DistributionZoneNotFoundException.java         |   2 +-
 ...ibutionZoneManagerConfigurationChangesTest.java | 121 +++---
 ...butionZoneManagerLogicalTopologyEventsTest.java |  11 +-
 .../DistributionZoneManagerTest.java               |  71 ++--
 .../DistributionZoneManagerWatchListenerTest.java  | 448 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 9 files changed, 792 insertions(+), 175 deletions(-)

diff --git a/modules/distribution-zones/build.gradle 
b/modules/distribution-zones/build.gradle
index c1e9ae2500..58a4cd5f42 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -33,6 +33,7 @@ dependencies {
     implementation project(':ignite-metastorage-client')
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-metastorage')
+    implementation project(':ignite-vault')
     implementation project(':ignite-configuration')
 
     implementation libs.jetbrains.annotations
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index d8f5117835..347d774350 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -18,33 +18,39 @@
 package org.apache.ignite.internal.distributionzones;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
 import static 
org.apache.ignite.internal.metastorage.client.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.client.Conditions.value;
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.NamedListChange;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
@@ -56,14 +62,19 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.client.CompoundCondition;
 import org.apache.ignite.internal.metastorage.client.Condition;
+import org.apache.ignite.internal.metastorage.client.Entry;
 import org.apache.ignite.internal.metastorage.client.If;
 import org.apache.ignite.internal.metastorage.client.Update;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distribution zones manager.
@@ -78,8 +89,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Meta Storage manager. */
     private final MetaStorageManager metaStorageManager;
 
-    /* Cluster Management manager. */
-    private final ClusterManagementGroupManager cmgManager;
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -108,24 +119,34 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
     };
 
+    /** The logical topology on the last watch event.
+     *  It's enough to mark this field by volatile because we don't update the 
collection after it is assigned to the field.
+     */
+    private volatile Set<String> logicalTopology;
+
+    /** Watch listener id to unregister the watch listener on {@link 
DistributionZoneManager#stop()}. */
+    private volatile Long watchListenerId;
+
     /**
      * Creates a new distribution zone manager.
      *
      * @param zonesConfiguration Distribution zones configuration.
      * @param metaStorageManager Meta Storage manager.
-     * @param cmgManager Cluster management group manager.
      * @param logicalTopologyService Logical topology service.
+     * @param vaultMgr Vault manager.
      */
     public DistributionZoneManager(
             DistributionZonesConfiguration zonesConfiguration,
             MetaStorageManager metaStorageManager,
-            ClusterManagementGroupManager cmgManager,
-            LogicalTopologyService logicalTopologyService
+            LogicalTopologyService logicalTopologyService,
+            VaultManager vaultMgr
     ) {
         this.zonesConfiguration = zonesConfiguration;
         this.metaStorageManager = metaStorageManager;
-        this.cmgManager = cmgManager;
         this.logicalTopologyService = logicalTopologyService;
+        this.vaultMgr = vaultMgr;
+
+        logicalTopology = Collections.emptySet();
     }
 
     /**
@@ -133,12 +154,16 @@ public class DistributionZoneManager implements 
IgniteComponent {
      *
      * @param distributionZoneCfg Distribution zone configuration.
      * @return Future representing pending completion of the operation.
+     *      Future can be completed with {@link ConfigurationChangeException} 
if a zone with the given name already exists
+     *      or {@code distributionZoneCfg} is broken.
+     * @throws NullPointerException if {@code distributionZoneCfg} is {@code 
null}.
+     * @throws NodeStoppingException If the node is stopping.
      */
     public CompletableFuture<Void> 
createZone(DistributionZoneConfigurationParameters distributionZoneCfg) {
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone 
configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
@@ -171,8 +196,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
                     });
                 } catch (IllegalArgumentException e) {
                     throw new 
DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
-                } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, 
distributionZoneCfg.name(), e);
                 }
             }));
         } finally {
@@ -186,13 +209,17 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * @param name Distribution zone name.
      * @param distributionZoneCfg Distribution zone configuration.
      * @return Future representing pending completion of the operation.
+     *      Future can be completed with {@link ConfigurationChangeException} 
if a zone with the given name already exists or
+     *      zone with name for renaming already exists or {@code 
distributionZoneCfg} is broken.
+     * @throws NullPointerException if {@code name} or {@code 
distributionZoneCfg} is {@code null}.
+     * @throws NodeStoppingException If the node is stopping.
      */
     public CompletableFuture<Void> alterZone(String name, 
DistributionZoneConfigurationParameters distributionZoneCfg) {
         Objects.requireNonNull(name, "Distribution zone name is null.");
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone 
configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
@@ -203,8 +230,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
                     renameChange = zonesListChange.rename(name, 
distributionZoneCfg.name());
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneRenameException(name, 
distributionZoneCfg.name(), e);
-                } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, 
distributionZoneCfg.name(), e);
                 }
 
                 try {
@@ -231,8 +256,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
                                     });
                 } catch (IllegalArgumentException e) {
                     throw new 
DistributionZoneNotFoundException(distributionZoneCfg.name(), e);
-                } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, 
distributionZoneCfg.name(), e);
                 }
             }));
         } finally {
@@ -245,12 +268,15 @@ public class DistributionZoneManager implements 
IgniteComponent {
      *
      * @param name Distribution zone name.
      * @return Future representing pending completion of the operation.
+     *      Future can be completed with {@link ConfigurationChangeException} 
if a zone with the given name doesn't exist.
+     * @throws NullPointerException if {@code name}  is {@code null}.
+     * @throws NodeStoppingException If the node is stopping.
      */
     public CompletableFuture<Void> dropZone(String name) {
         Objects.requireNonNull(name, "Distribution zone name is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
@@ -271,11 +297,21 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public void start() {
-        zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
 
-        logicalTopologyService.addEventListener(topologyEventListener);
+            logicalTopologyService.addEventListener(topologyEventListener);
 
-        initMetaStorageKeysOnStart();
+            registerMetaStorageWatchListener()
+                    .thenAccept(ignore -> initDataNodesFromVaultManager())
+                    .thenAccept(ignore -> initMetaStorageKeysOnStart());
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -288,6 +324,10 @@ public class DistributionZoneManager implements 
IgniteComponent {
         busyLock.block();
 
         logicalTopologyService.removeEventListener(topologyEventListener);
+
+        if (watchListenerId != null) {
+            metaStorageManager.unregisterWatch(watchListenerId);
+        }
     }
 
     private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
@@ -328,30 +368,22 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         try {
-            Set<ClusterNode> logicalTopology;
-
-            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18121
-            try {
-                logicalTopology = cmgManager.logicalTopology().get().nodes();
-            } catch (InterruptedException | ExecutionException e) {
-                throw new IgniteInternalException(e);
-            }
-
-            assert !logicalTopology.isEmpty() : "Logical topology cannot be 
empty.";
-
             // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
             // so we do not react on a stale events
             CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
 
-            Set<String> nodesConsistentIds = 
logicalTopology.stream().map(ClusterNode::name).collect(Collectors.toSet());
+            // logicalTopology can be updated concurrently by the watch 
listener.
+            Set<String> logicalTopology0 = logicalTopology;
 
-            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, nodesConsistentIds);
+            byte[] logicalTopologyBytes = toBytes(logicalTopology0);
+
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
 
             If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
 
             metaStorageManager.invoke(iif).thenAccept(res -> {
                 if (res.getAsBoolean()) {
-                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, nodesConsistentIds);
+                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, logicalTopology0);
                 } else {
                     LOG.debug("Failed to update zones' dataNodes value [zoneId 
= {}]", zoneId);
                 }
@@ -483,47 +515,190 @@ public class DistributionZoneManager implements 
IgniteComponent {
             }
 
             try {
-                long topologyVersionFromCmg = snapshot.version();
+                
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry 
-> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
 
-                byte[] topVerFromMetastorage;
+                    try {
+                        long topologyVersionFromCmg = snapshot.version();
 
-                try {
-                    topVerFromMetastorage = 
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
-                }
+                        byte[] topVerFromMetaStorage = topVerEntry.value();
 
-                if (topVerFromMetastorage == null || 
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
-                    Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                        if (topVerFromMetaStorage == null || 
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+                            Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-                    Condition topologyVersionCondition = topVerFromMetastorage 
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-                            
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
-
-                    If iff = If.iif(topologyVersionCondition,
-                            updateLogicalTopologyAndVersion(topologyFromCmg, 
topologyVersionFromCmg),
-                            ops().yield(false)
-                    );
+                            Condition topologyVersionCondition = 
topVerFromMetaStorage == null
+                                    ? 
notExists(zonesLogicalTopologyVersionKey()) :
+                                    
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
 
-                    metaStorageManager.invoke(iff).thenAccept(res -> {
-                        if (res.getAsBoolean()) {
-                            LOG.debug(
-                                    "Distribution zones' logical topology and 
version keys were initialised [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
-                            );
-                        } else {
-                            LOG.debug(
-                                    "Failed to initialize distribution zones' 
logical topology "
-                                            + "and version keys [topology = 
{}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
+                            If iff = If.iif(topologyVersionCondition,
+                                    
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+                                    ops().yield(false)
                             );
+
+                            metaStorageManager.invoke(iff).thenAccept(res -> {
+                                if (res.getAsBoolean()) {
+                                    LOG.debug(
+                                            "Distribution zones' logical 
topology and version keys were initialised "
+                                                    + "[topology = {}, version 
= {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                } else {
+                                    LOG.debug(
+                                            "Failed to initialize distribution 
zones' logical topology "
+                                                    + "and version keys 
[topology = {}, version = {}]",
+                                            
Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                }
+                            });
                         }
-                    });
-                }
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+
             } finally {
                 busyLock.leaveBusy();
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        vaultMgr.get(APPLIED_REV)
+                .thenApply(appliedRevision -> appliedRevision == null ? 0L : 
bytesToLong(appliedRevision.value()))
+                .thenAccept(vaultAppliedRevision -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException());
+                    }
+
+                    try {
+                        vaultMgr.get(zonesLogicalTopologyKey())
+                                .thenAccept(vaultEntry -> {
+                                    if (!busyLock.enterBusy()) {
+                                        throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                                    }
+
+                                    try {
+                                        if (vaultEntry != null && 
vaultEntry.value() != null) {
+                                            logicalTopology = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                                    .forEach(zoneName -> {
+                                                        int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                                                        
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+                                                    });
+                                        }
+                                    } finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                });
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of 
distribution zones on logical topology changing event.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<?> registerMetaStorageWatchListener() {
+        return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new 
WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new 
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = 
ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            
zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg 
= zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = 
zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = 
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = 
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = 
zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || 
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create 
scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), 
newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && 
autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create 
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && 
autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create 
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, 
newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", 
e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long 
revision) {
+        Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), 
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+            } else {
+                LOG.debug("Failed to delete zones' dataNodes key [zoneId = 
{}]", zoneId);
+            }
+        });
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index a76902a1c5..c620b8d8f1 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -103,9 +103,9 @@ class DistributionZonesUtil {
      * @param logicalTopology Logical topology.
      * @return Update command for the meta storage.
      */
-    static Update updateDataNodesAndTriggerKey(int zoneId, long revision, 
Set<String> logicalTopology) {
+    static Update updateDataNodesAndTriggerKey(int zoneId, long revision, 
byte[] logicalTopology) {
         return ops(
-                put(zoneDataNodesKey(zoneId), 
ByteUtils.toBytes(logicalTopology)),
+                put(zoneDataNodesKey(zoneId), logicalTopology),
                 put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
         ).yield(true);
     }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
index a12853fbe9..e921e6a8a6 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
@@ -33,7 +33,7 @@ public class DistributionZoneNotFoundException extends 
IgniteInternalException {
      * @param zoneName Zone name.
      */
     public DistributionZoneNotFoundException(String zoneName) {
-        this("Distribution zone is not found [zoneName=" + zoneName + ']', 
null);
+        this(zoneName, null);
     }
 
     /**
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index e8325efc94..df97816310 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -21,8 +21,12 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
@@ -40,8 +44,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -61,8 +63,10 @@ import 
org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -77,8 +81,7 @@ public class DistributionZoneManagerConfigurationChangesTest 
extends IgniteAbstr
 
     private static final String NEW_ZONE_NAME = "zone2";
 
-    @Mock
-    private ClusterManagementGroupManager cmgManager;
+    private static final Set<String> nodes = Set.of("name1");
 
     private DistributionZoneManager distributionZoneManager;
 
@@ -89,6 +92,8 @@ public class DistributionZoneManagerConfigurationChangesTest 
extends IgniteAbstr
     @Mock
     private LogicalTopologyServiceImpl logicalTopologyService;
 
+    private VaultManager vaultMgr;
+
     @BeforeEach
     public void setUp() {
         clusterCfgMgr = new ConfigurationManager(
@@ -104,15 +109,19 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
 
-        cmgManager = mock(ClusterManagementGroupManager.class);
+        when(metaStorageManager.registerWatch(any(ByteArray.class), 
any())).thenReturn(completedFuture(null));
 
         logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
 
+        vaultMgr = mock(VaultManager.class);
+
+        when(vaultMgr.get(any())).thenReturn(completedFuture(null));
+
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 metaStorageManager,
-                cmgManager,
-                logicalTopologyService
+                logicalTopologyService,
+                vaultMgr
         );
 
         clusterCfgMgr.start();
@@ -121,6 +130,8 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
 LogicalTopologySnapshot(1, Set.of())));
 
+        mockVaultZonesLogicalTopologyKey(nodes);
+
         distributionZoneManager.start();
 
         AtomicLong raftIndex = new AtomicLong();
@@ -196,34 +207,23 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     @Test
     void testDataNodesPropagationAfterZoneCreation() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
+        assertDataNodesForZone(1, null);
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
         assertZonesChangeTriggerKey(1);
     }
 
     @Test
     void testTriggerKeyPropagationAfterZoneUpdate() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        LogicalTopologySnapshot logicalTopologySnapshot = 
mockCmgLocalNodes(clusterNodes);
+        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
         assertZonesChangeTriggerKey(1);
 
-        var clusterNodes2 = Set.of(
-                new ClusterNode("1", "name1", null),
-                new ClusterNode("2", "name2", null)
-        );
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes2);
-
         distributionZoneManager.alterZone(
                 ZONE_NAME,
                 new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
@@ -231,29 +231,23 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         assertZonesChangeTriggerKey(2);
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
     @Test
     void testZoneDeleteRemovesMetaStorageKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
-        distributionZoneManager.dropZone(ZONE_NAME);
+        distributionZoneManager.dropZone(ZONE_NAME).get();
 
         assertTrue(waitForCondition(() -> 
keyValueStorage.get(zoneDataNodesKey(1).bytes()).value() == null, 5000));
     }
 
     @Test
     void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
+        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
@@ -264,9 +258,7 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     @Test
     void testSeveralZoneUpdatesUpdatesTriggerKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
+        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
@@ -285,11 +277,7 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     @Test
     void testDataNodesNotPropagatedAfterZoneCreation() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
ByteUtils.longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
@@ -302,22 +290,11 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     @Test
     void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        LogicalTopologySnapshot logicalTopologySnapshot = 
mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
-
-        var clusterNodes2 = Set.of(
-                new ClusterNode("1", "name1", null),
-                new ClusterNode("2", "name2", null)
-        );
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes2);
+        assertDataNodesForZone(1, nodes);
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
ByteUtils.longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
 
         distributionZoneManager.alterZone(
                 ZONE_NAME,
@@ -328,44 +305,36 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         assertZonesChangeTriggerKey(100);
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
     @Test
     void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
ByteUtils.longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
 
-        distributionZoneManager.dropZone(ZONE_NAME);
+        distributionZoneManager.dropZone(ZONE_NAME).get();
 
         verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
-    private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode> 
clusterNodes) {
-        LogicalTopologySnapshot logicalTopologySnapshot = 
mock(LogicalTopologySnapshot.class);
-
-        
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
 
-        return logicalTopologySnapshot;
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
     }
 
-    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> 
clusterNodes) throws InterruptedException {
-        byte[] nodes = clusterNodes == null
-                ? null
-                : 
ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<String> 
clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
 
-        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), 
nodes), 1000));
+        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), 
nodes),
+                1000));
     }
 
     private void assertZonesChangeTriggerKey(int revision) throws 
InterruptedException {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index cc7272217a..e479dea073 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
@@ -118,11 +119,17 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
 
         LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(topology, cmgManager);
 
+        VaultManager vaultMgr = mock(VaultManager.class);
+
+        when(vaultMgr.get(any())).thenReturn(completedFuture(null));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), 
any())).then(invocation -> completedFuture(null));
+
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 metaStorageManager,
-                cmgManager,
-                logicalTopologyService
+                logicalTopologyService,
+                vaultMgr
         );
 
         clusterCfgMgr.start();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index 9db029085d..6e5e94f715 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -26,12 +26,13 @@ import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.ConfigurationChangeException;
-import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
@@ -39,6 +40,7 @@ import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
 import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneRenameException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -71,8 +73,8 @@ class DistributionZoneManagerTest extends IgniteAbstractTest {
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 mock(MetaStorageManager.class),
-                mock(ClusterManagementGroupManager.class),
-                mock(LogicalTopologyServiceImpl.class)
+                mock(LogicalTopologyServiceImpl.class),
+                mock(VaultManager.class)
         );
     }
 
@@ -156,10 +158,14 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
                 new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -172,8 +178,10 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testDropZoneIfNotExists() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = 
distributionZoneManager.dropZone(ZONE_NAME);
+
         try {
-            distributionZoneManager.dropZone(ZONE_NAME).get(5, 
TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -294,9 +302,12 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testAlterZoneRename1() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = distributionZoneManager
+                .alterZone(ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
+                .dataNodesAutoAdjust(100).build());
+
         try {
-            distributionZoneManager.alterZone(ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
-                    .dataNodesAutoAdjust(100).build()).get(5, 
TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -315,9 +326,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
                 .dataNodesAutoAdjust(100).build()).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut = 
distributionZoneManager.alterZone(ZONE_NAME, new Builder(NEW_ZONE_NAME)
+                .dataNodesAutoAdjust(100).build());
+
         try {
-            distributionZoneManager.alterZone(ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
-                    .dataNodesAutoAdjust(100).build()).get(5, 
TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -330,9 +343,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testAlterZoneIfExists() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = 
distributionZoneManager.alterZone(ZONE_NAME, new Builder(ZONE_NAME)
+                .dataNodesAutoAdjust(100).build());
+
         try {
-            distributionZoneManager.alterZone(ZONE_NAME, new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-                    .dataNodesAutoAdjust(100).build()).get(5, 
TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -345,9 +360,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testCreateZoneWithWrongAutoAdjust() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = distributionZoneManager.createZone(new 
Builder(ZONE_NAME)
+                .dataNodesAutoAdjust(-10).build());
+
         try {
-            distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-                    .dataNodesAutoAdjust(-10).build()).get(5, 
TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -360,9 +377,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testCreateZoneWithWrongSeparatedAutoAdjust1() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = distributionZoneManager.createZone(new 
Builder(ZONE_NAME)
+                
.dataNodesAutoAdjustScaleUp(-100).dataNodesAutoAdjustScaleDown(1).build());
+
         try {
-            distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-                    
.dataNodesAutoAdjustScaleUp(-100).dataNodesAutoAdjustScaleDown(1).build()).get(5,
 TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -375,9 +394,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
     public void testCreateZoneWithWrongSeparatedAutoAdjust2() {
         Exception e = null;
 
+        CompletableFuture<Void> fut = distributionZoneManager.createZone(new 
Builder(ZONE_NAME)
+                
.dataNodesAutoAdjustScaleUp(1).dataNodesAutoAdjustScaleDown(-100).build());
+
         try {
-            distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-                    
.dataNodesAutoAdjustScaleUp(1).dataNodesAutoAdjustScaleDown(-100).build()).get(5,
 TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
@@ -391,7 +412,7 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
         Exception e = null;
 
         try {
-            distributionZoneManager.createZone(null).get(5, TimeUnit.SECONDS);
+            distributionZoneManager.createZone(null);
         } catch (Exception e0) {
             e = e0;
         }
@@ -406,8 +427,7 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
         Exception e = null;
 
         try {
-            distributionZoneManager.alterZone(null, new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build())
-                    .get(5, TimeUnit.SECONDS);
+            distributionZoneManager.alterZone(null, new 
Builder(ZONE_NAME).build());
         } catch (Exception e0) {
             e = e0;
         }
@@ -422,13 +442,11 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
         Exception e = null;
 
         try {
-            distributionZoneManager.alterZone(ZONE_NAME, null)
-                    .get(5, TimeUnit.SECONDS);
+            distributionZoneManager.alterZone(ZONE_NAME, null);
         } catch (Exception e0) {
             e = e0;
         }
 
-        assertTrue(e != null);
         assertTrue(e instanceof NullPointerException, e.toString());
         assertEquals("Distribution zone configuration is null.", 
e.getMessage(), e.toString());
     }
@@ -438,8 +456,7 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
         Exception e = null;
 
         try {
-            distributionZoneManager.dropZone(null)
-                    .get(5, TimeUnit.SECONDS);
+            distributionZoneManager.dropZone(null);
         } catch (Exception e0) {
             e = e0;
         }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
new file mode 100644
index 0000000000..3e518b6ced
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static 
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import 
org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import 
org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that 
changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends 
IgniteAbstractTest {
+    private static final String ZONE_NAME_1 = "zone1";
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        LogicalTopologyServiceImpl logicalTopologyService = 
mock(LogicalTopologyServiceImpl.class);
+
+        LogicalTopologySnapshot topologySnapshot = 
mock(LogicalTopologySnapshot.class);
+
+        when(topologySnapshot.version()).thenReturn(1L);
+        when(topologySnapshot.nodes()).thenReturn(Collections.emptySet());
+
+        
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(topologySnapshot));
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), 
any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, 
commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> 
new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        //TODO: Add second distribution zone, when 
distributionZones.change.trigger per zone will be created.
+        mockZones(mockZoneWithAutoAdjust());
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    private void checkDataNodesOfZone(int zoneId, Set<String> nodes) {
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes());
+
+        if (nodes == null) {
+            assertNull(entry.value());
+        } else {
+            Set<String> newDataNodes = fromBytes(entry.value());
+
+            assertTrue(newDataNodes.containsAll(nodes));
+            assertEquals(nodes.size(), newDataNodes.size());
+        }
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    @Test
+    void testLogicalTopologyIsNullOnZoneManagerStart1() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, after(500).never()).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        
when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = 
mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        
when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockZones(DistributionZoneConfiguration... zones) {
+        List<String> names = new ArrayList<>();
+
+        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        
when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = 
mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        for (DistributionZoneConfiguration zone : zones) {
+            names.add(zone.name().value());
+
+            
when(namedConfigurationTree.get(zone.name().value())).thenReturn(zone);
+        }
+
+        when(namedListView.namedListKeys()).thenReturn(names);
+    }
+
+    private DistributionZoneConfiguration mockZone(
+            Integer zoneId,
+            String name,
+            Integer dataNodesAutoAdjustTime,
+            Integer dataNodesAutoAdjustScaleUpTime,
+            Integer dataNodesAutoAdjustScaleDownTime
+    ) {
+        DistributionZoneConfiguration distributionZoneConfiguration = 
mock(DistributionZoneConfiguration.class);
+
+        ConfigurationValue<String> nameValue = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.name()).thenReturn(nameValue);
+        when(nameValue.value()).thenReturn(name);
+
+        ConfigurationValue<Integer> zoneIdValue = 
mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.zoneId()).thenReturn(zoneIdValue);
+        when(zoneIdValue.value()).thenReturn(zoneId);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust = 
mock(ConfigurationValue.class);
+        
when(distributionZoneConfiguration.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust);
+        when(dataNodesAutoAdjust.value()).thenReturn(dataNodesAutoAdjustTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp = 
mock(ConfigurationValue.class);
+        
when(distributionZoneConfiguration.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp);
+        
when(dataNodesAutoAdjustScaleUp.value()).thenReturn(dataNodesAutoAdjustScaleUpTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown = 
mock(ConfigurationValue.class);
+        
when(distributionZoneConfiguration.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown);
+        
when(dataNodesAutoAdjustScaleDown.value()).thenReturn(dataNodesAutoAdjustScaleDownTime);
+
+        return distributionZoneConfiguration;
+    }
+
+    private DistributionZoneConfiguration mockZoneWithAutoAdjust() {
+        return mockZone(1, ZONE_NAME_1, 100, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =
+                new EntryImpl(zonesLogicalTopologyKey(), newLogicalTopology, 
rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new 
VaultEntry(APPLIED_REV, longToBytes(revision))));
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index aa2930acb5..6d5a560e48 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -352,7 +352,7 @@ public class IgniteImpl implements Ignite {
         DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
                 .getConfiguration(DistributionZonesConfiguration.KEY);
 
-        distributionZoneManager = new 
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr, 
logicalTopologyService);
+        distributionZoneManager = new 
DistributionZoneManager(zonesConfiguration, metaStorageMgr, 
logicalTopologyService, vaultMgr);
 
         restComponent = createRestComponent(name);
 

Reply via email to