This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 54da917864 IGNITE-18121 Scale up scheduler added (#1508)
54da917864 is described below

commit 54da917864cfc896e960f6de85dbf8f01f336388
Author: Mirza Aliev <[email protected]>
AuthorDate: Thu Jan 26 16:16:23 2023 +0400

    IGNITE-18121 Scale up scheduler added (#1508)
---
 .../distributionzones/DistributionZoneManager.java | 552 ++++++++++++---
 .../distributionzones/DistributionZonesUtil.java   | 110 ++-
 ...ibutionZoneManagerConfigurationChangesTest.java | 181 ++---
 ...butionZoneManagerLogicalTopologyEventsTest.java |   3 +-
 .../DistributionZoneManagerScaleUpTest.java        | 745 +++++++++++++++++++++
 .../DistributionZoneManagerTest.java               |   3 +-
 .../DistributionZoneManagerWatchListenerTest.java  | 107 ++-
 .../DistributionZonesSchedulersTest.java           | 141 ++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../DdlCommandHandlerExceptionHandlingTest.java    |   3 +-
 10 files changed, 1623 insertions(+), 225 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 7adb3370a3..0855973f9f 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -20,28 +20,45 @@ package org.apache.ignite.internal.distributionzones;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyConditionForZonesChanges;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerScaleUpScaleDownKeysCondition;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndScaleUpTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.NamedListChange;
@@ -69,18 +86,22 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.If;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.schema.configuration.TableChange;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Distribution zones manager.
@@ -89,6 +110,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Name of the default distribution zone. */
     public static final String DEFAULT_ZONE_NAME = "Default";
 
+    private static final String DISTRIBUTION_ZONE_MANAGER_POOL_NAME = 
"dst-zones-scheduler";
+
     /** Id of the default distribution zone. */
     public static final int DEFAULT_ZONE_ID = 0;
 
@@ -116,6 +139,15 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Logical topology service to track topology changes. */
     private final LogicalTopologyService logicalTopologyService;
 
+    /** Executor for scheduling tasks for scale up and scale down processes. */
+    private final ScheduledExecutorService executor;
+
+    /**
+     * Map with states for distribution zones. States are needed to track 
nodes that we want to add or remove from the data nodes,
+     * schedule and stop scale up and scale down processes.
+     */
+    private final Map<Integer, ZoneState> zonesState;
+
     /** Listener for a topology events. */
     private final LogicalTopologyEventListener topologyEventListener = new 
LogicalTopologyEventListener() {
         @Override
@@ -134,8 +166,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
     };
 
-    /** The logical topology on the last watch event.
-     *  It's enough to mark this field by volatile because we don't update the 
collection after it is assigned to the field.
+    /**
+     * The logical topology on the last watch event.
+     * It's enough to mark this field by volatile because we don't update the 
collection after it is assigned to the field.
      */
     private volatile Set<String> logicalTopology;
 
@@ -156,7 +189,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
             TablesConfiguration tablesConfiguration,
             MetaStorageManager metaStorageManager,
             LogicalTopologyService logicalTopologyService,
-            VaultManager vaultMgr
+            VaultManager vaultMgr,
+            String nodeName
     ) {
         this.zonesConfiguration = zonesConfiguration;
         this.tablesConfiguration = tablesConfiguration;
@@ -166,7 +200,20 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         this.watchListener = createMetastorageListener();
 
+        zonesState = new ConcurrentHashMap<>();
+
         logicalTopology = Collections.emptySet();
+
+        executor = new ScheduledThreadPoolExecutor(
+                Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, 
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
+                new ThreadPoolExecutor.DiscardPolicy()
+        );
+    }
+
+    @TestOnly
+    Map<Integer, ZoneState> zonesTimers() {
+        return zonesState;
     }
 
     /**
@@ -435,13 +482,23 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             
zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
 
+            // Init timers after restart.
+            zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        zonesState.putIfAbsent(zoneId, new 
ZoneState(executor));
+                    });
+
             logicalTopologyService.addEventListener(topologyEventListener);
 
             metaStorageManager.registerExactWatch(zonesLogicalTopologyKey(), 
watchListener);
 
             initDataNodesFromVaultManager();
 
-            initMetaStorageKeysOnStart();
+            initLogicalTopologyAndVersionInMetaStorageOnStart();
         } finally {
             busyLock.leaveBusy();
         }
@@ -459,41 +516,77 @@ public class DistributionZoneManager implements 
IgniteComponent {
         logicalTopologyService.removeEventListener(topologyEventListener);
 
         metaStorageManager.unregisterWatch(watchListener);
+
+        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
     }
 
     private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneCreate(ctx.newValue().zoneId(), 
ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            ZoneState zoneState = new ZoneState(executor);
+
+            zonesState.putIfAbsent(zoneId, zoneState);
+
+            saveDataNodesAndUpdateTriggerKeysInMetaStorage(zoneId, 
ctx.storageRevision(), toBytes(logicalTopology));
 
             return completedFuture(null);
         }
 
         @Override
         public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), 
ctx.storageRevision());
+            int zoneId = ctx.oldValue().zoneId();
+
+            zonesState.get(zoneId).stopTimers();
+
+            removeTriggerKeysAndDataNodes(zoneId, ctx.storageRevision());
+
+            zonesState.remove(zoneId);
 
             return completedFuture(null);
         }
 
         @Override
         public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneUpdate(ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
 
-            //TODO: Also add here rescheduling for the existing timers 
https://issues.apache.org/jira/browse/IGNITE-18121
+            int oldScaleUp;
+
+            // ctx.oldValue() could be null for the default zone on a first 
start.
+            if (ctx.oldValue() == null) {
+                oldScaleUp = Integer.MAX_VALUE;
+            } else {
+                oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+            }
+
+            int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
+
+            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
+                // It is safe to zonesTimers.get(zoneId) in term of NPE 
because meta storage notifications are one-threaded
+                zonesState.get(zoneId).rescheduleScaleUp(
+                        newScaleUp,
+                        () -> CompletableFuture.supplyAsync(
+                                () -> 
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
+                                Runnable::run
+                        )
+                );
+            }
 
             return completedFuture(null);
         }
     }
 
     /**
-     * Method updates data nodes value for the specified zone,
-     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     * Method updates data nodes value for the specified zone, also sets 
{@code revision} to the
+     * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link 
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}
+     * and {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it 
passes the condition.
      *
      * @param zoneId Unique id of a zone
      * @param revision Revision of an event that has triggered this method.
+     * @param dataNodes Data nodes.
      */
-    private void updateMetaStorageOnZoneCreate(int zoneId, long revision) {
+    private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, 
long revision, byte[] dataNodes) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -501,20 +594,15 @@ public class DistributionZoneManager implements 
IgniteComponent {
         try {
             // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
             // so we do not react on a stale events
-            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
-
-            // logicalTopology can be updated concurrently by the watch 
listener.
-            Set<String> logicalTopology0 = logicalTopology;
+            CompoundCondition triggerKeyCondition = 
triggerKeyConditionForZonesChanges(revision, zoneId);
 
-            byte[] logicalTopologyBytes = toBytes(logicalTopology0);
-
-            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKeys(zoneId, revision, dataNodes);
 
             If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
 
             metaStorageManager.invoke(iif).thenAccept(res -> {
                 if (res.getAsBoolean()) {
-                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, logicalTopology0);
+                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, dataNodes);
                 } else {
                     LOG.debug("Failed to update zones' dataNodes value [zoneId 
= {}]", zoneId);
                 }
@@ -525,52 +613,22 @@ public class DistributionZoneManager implements 
IgniteComponent {
     }
 
     /**
-     * Sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
-     *
-     * @param revision Revision of an event that has triggered this method.
-     */
-    private void updateMetaStorageOnZoneUpdate(long revision) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-        }
-
-        try {
-            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
-
-            Update triggerKeyUpd = updateTriggerKey(revision);
-
-            If iif = If.iif(triggerKeyCondition, triggerKeyUpd, 
ops().yield(false));
-
-            metaStorageManager.invoke(iif).thenAccept(res -> {
-                if (res.getAsBoolean()) {
-                    LOG.debug("Distribution zones' trigger key was updated 
with the revision [revision = {}]", revision);
-                } else {
-                    LOG.debug("Failed to update distribution zones' trigger 
key with the revision [revision = {}]", revision);
-                }
-            });
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * Method deletes data nodes value for the specified zone,
-     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     * Method deletes data nodes value for the specified zone.
      *
      * @param zoneId Unique id of a zone
      * @param revision Revision of an event that has triggered this method.
      */
-    private void updateMetaStorageOnZoneDelete(int zoneId, long revision) {
+    private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
-            CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
+            CompoundCondition triggerKeyCondition = 
triggerKeyConditionForZonesChanges(revision, zoneId);
 
-            Update dataNodesRemoveUpd = 
deleteDataNodesKeyAndUpdateTriggerKey(zoneId, revision);
+            Update removeKeysUpd = deleteDataNodesAndUpdateTriggerKeys(zoneId, 
revision);
 
-            If iif = If.iif(triggerKeyCondition, dataNodesRemoveUpd, 
ops().yield(false));
+            If iif = If.iif(triggerKeyCondition, removeKeysUpd, 
ops().yield(false));
 
             metaStorageManager.invoke(iif).thenAccept(res -> {
                 if (res.getAsBoolean()) {
@@ -624,7 +682,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         try {
-            Set<String> topologyFromCmg = 
newTopology.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+            Set<String> topologyFromCmg = 
newTopology.nodes().stream().map(ClusterNode::name).collect(toSet());
 
             Condition updateCondition;
 
@@ -665,7 +723,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
      * {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from 
meta storage on the start of {@link DistributionZoneManager}.
      */
-    private void initMetaStorageKeysOnStart() {
+    private void initLogicalTopologyAndVersionInMetaStorageOnStart() {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -688,7 +746,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                             byte[] topVerFromMetaStorage = topVerEntry.value();
 
                             if (topVerFromMetaStorage == null || 
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
-                                Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                                Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(toSet());
 
                                 Condition topologyVersionCondition = 
topVerFromMetaStorage == null
                                         ? 
notExists(zonesLogicalTopologyVersionKey()) :
@@ -751,16 +809,25 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
                         try {
                             if (vaultEntry != null && vaultEntry.value() != 
null) {
-                                logicalTopology = 
ByteUtils.fromBytes(vaultEntry.value());
+                                logicalTopology = 
fromBytes(vaultEntry.value());
+
+                                // init keys and data nodes for default zone
+                                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                                        DEFAULT_ZONE_ID,
+                                        appliedRevision,
+                                        vaultEntry.value()
+                                );
 
                                 
zonesConfiguration.distributionZones().value().namedListKeys()
                                         .forEach(zoneName -> {
                                             int zoneId = 
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
 
-                                            saveDataNodesToMetaStorage(zoneId, 
vaultEntry.value(), appliedRevision);
+                                            
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                                                    zoneId,
+                                                    appliedRevision,
+                                                    vaultEntry.value()
+                                            );
                                         });
-
-                                saveDataNodesToMetaStorage(DEFAULT_ZONE_ID, 
vaultEntry.value(), appliedRevision);
                             }
                         } finally {
                             busyLock.leaveBusy();
@@ -790,13 +857,13 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
                     byte[] newLogicalTopologyBytes = newEntry.value();
 
-                    Set<String> newLogicalTopology = 
ByteUtils.fromBytes(newLogicalTopologyBytes);
+                    Set<String> newLogicalTopology = 
fromBytes(newLogicalTopologyBytes);
 
-                    List<String> removedNodes =
-                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toList());
+                    Set<String> removedNodes =
+                            logicalTopology.stream().filter(node -> 
!newLogicalTopology.contains(node)).collect(toSet());
 
-                    List<String> addedNodes =
-                            newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toList());
+                    Set<String> addedNodes =
+                            newLogicalTopology.stream().filter(node -> 
!logicalTopology.contains(node)).collect(toSet());
 
                     logicalTopology = newLogicalTopology;
 
@@ -806,12 +873,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
                     for (int i = 0; i < zones.value().size(); i++) {
                         DistributionZoneView zoneView = zones.value().get(i);
 
-                        scheduleTimers(zoneView, addedNodes, removedNodes, 
newLogicalTopologyBytes, revision);
+                        scheduleTimers(zoneView, addedNodes, removedNodes, 
revision);
                     }
 
                     DistributionZoneView defaultZoneView = 
zonesConfiguration.value().defaultDistributionZone();
 
-                    scheduleTimers(defaultZoneView, addedNodes, removedNodes, 
newLogicalTopologyBytes, revision);
+                    scheduleTimers(defaultZoneView, addedNodes, removedNodes, 
revision);
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -824,11 +891,44 @@ public class DistributionZoneManager implements 
IgniteComponent {
         };
     }
 
+    /**
+     * Schedules scale up and scale down timers.
+     *
+     * @param zoneCfg Zone's configuration.
+     * @param addedNodes Nodes that was added to a topology and should be 
added to zones data nodes.
+     * @param removedNodes Nodes that was removed from a topology and should 
be removed from zones data nodes.
+     * @param revision Revision that triggered that event.
+     */
     private void scheduleTimers(
             DistributionZoneView zoneCfg,
-            List<String> addedNodes, List<String> removedNodes,
-            byte[] newLogicalTopologyBytes,
+            Set<String> addedNodes,
+            Set<String> removedNodes,
             long revision
+    ) {
+        scheduleTimers(
+                zoneCfg,
+                addedNodes,
+                removedNodes,
+                revision,
+                this::saveDataNodesToMetaStorageOnScaleUp
+        );
+    }
+
+    /**
+     * Schedules scale up and scale down timers. This method is needed also 
for test purposes.
+     *
+     * @param zoneCfg Zone's configuration.
+     * @param addedNodes Nodes that was added to a topology and should be 
added to zones data nodes.
+     * @param removedNodes Nodes that was removed from a topology and should 
be removed from zones data nodes.
+     * @param revision Revision that triggered that event.
+     * @param saveDataNodes Function that save nodes to a zone's data nodes.
+     */
+    void scheduleTimers(
+            DistributionZoneView zoneCfg,
+            Set<String> addedNodes,
+            Set<String> removedNodes,
+            long revision,
+            BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodes
     ) {
         int autoAdjust = zoneCfg.dataNodesAutoAdjust();
         int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown();
@@ -838,48 +938,28 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust 
!= Integer.MAX_VALUE) {
             //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust 
timer.
-            saveDataNodesToMetaStorage(
-                    zoneId, newLogicalTopologyBytes, revision
-            );
+            throw new UnsupportedOperationException("Data nodes auto adjust is 
not supported.");
         } else {
             if (!addedNodes.isEmpty() && autoAdjustScaleUp != 
Integer.MAX_VALUE) {
                 //TODO: IGNITE-18121 Create scale up scheduler with 
dataNodesAutoAdjustScaleUp timer.
-                saveDataNodesToMetaStorage(
-                        zoneId, newLogicalTopologyBytes, revision
+                zonesState.get(zoneId).addNodesToDataNodes(addedNodes, 
revision);
+
+                zonesState.get(zoneId).rescheduleScaleUp(
+                        autoAdjustScaleUp,
+                        () -> CompletableFuture.supplyAsync(
+                                () -> saveDataNodes.apply(zoneId, revision),
+                                Runnable::run
+                        )
                 );
             }
 
             if (!removedNodes.isEmpty() && autoAdjustScaleDown != 
Integer.MAX_VALUE) {
                 //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
-                saveDataNodesToMetaStorage(
-                        zoneId, newLogicalTopologyBytes, revision
-                );
+                throw new UnsupportedOperationException("Data nodes auto 
adjust scale down is not supported.");
             }
         }
     }
 
-    /**
-     * Method updates data nodes value for the specified zone,
-     * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
-     *
-     * @param zoneId Unique id of a zone
-     * @param dataNodes Data nodes of a zone
-     * @param revision Revision of an event that has triggered this method.
-     */
-    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long 
revision) {
-        Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
-
-        var iif = If.iif(triggerKeyCondition(revision), 
dataNodesAndTriggerKeyUpd, ops().yield(false));
-
-        metaStorageManager.invoke(iif).thenAccept(res -> {
-            if (res.getAsBoolean()) {
-                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
-            } else {
-                LOG.debug("Failed to delete zones' dataNodes key [zoneId = 
{}]", zoneId);
-            }
-        });
-    }
-
     /**
      * Unwraps distribution zone exception from {@link 
ConfigurationChangeException} if it is possible.
      *
@@ -906,4 +986,264 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up 
timer timeout, sets {@code revision} to the
+     * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link 
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
+     * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it passes 
the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    CompletableFuture<Void> saveDataNodesToMetaStorageOnScaleUp(int zoneId, 
long revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            ZoneState zoneState = zonesState.get(zoneId);
+
+            if (zoneState == null) {
+                // Zone was deleted
+                return completedFuture(null);
+            }
+
+            Set<ByteArray> keysToGetFromMs = Set.of(
+                    zoneDataNodesKey(zoneId),
+                    zoneScaleUpChangeTriggerKey(zoneId),
+                    zoneScaleDownChangeTriggerKey(zoneId)
+            );
+
+            return 
metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> 
inBusyLock(busyLock, () -> {
+                if (values.containsValue(null)) {
+                    // Zone was deleted
+                    return completedFuture(null);
+                }
+
+                Set<String> dataNodesFromMetaStorage = 
fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+                long scaleUpTriggerRevision = 
bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+                long scaleDownTriggerRevision = 
bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+                if (revision <= scaleUpTriggerRevision) {
+                    return completedFuture(null);
+                }
+
+                Set<String> deltaToAdd = 
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision, 
scaleDownTriggerRevision, revision);
+
+                if (deltaToAdd.isEmpty()) {
+                    return completedFuture(null);
+                }
+
+                Set<String> newDataNodes = new 
HashSet<>(dataNodesFromMetaStorage);
+
+                newDataNodes.addAll(deltaToAdd);
+
+                Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+                If iif = If.iif(
+                        
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, 
scaleDownTriggerRevision, zoneId),
+                        dataNodesAndTriggerKeyUpd,
+                        ops().yield(false)
+                );
+
+                return metaStorageManager.invoke(iif)
+                        .thenApply(StatementResult::getAsBoolean)
+                        .thenApply(invokeResult -> inBusyLock(busyLock, () -> {
+                            if (invokeResult) {
+                                
zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
+                            } else {
+                                LOG.debug("Updating data nodes for a zone has 
not succeeded [zoneId = {}]", zoneId);
+
+                                return 
saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+                            }
+
+                            return completedFuture(null);
+                        }));
+            })).handle((v, e) -> {
+                if (e != null) {
+                    LOG.warn("Failed to update zones' dataNodes value [zoneId 
= {}]", e, zoneId);
+
+                    return CompletableFuture.<Void>failedFuture(e);
+                }
+
+                return CompletableFuture.<Void>completedFuture(null);
+            }).thenCompose(Function.identity());
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Class responsible for storing state for a distribution zone.
+     * States are needed to track nodes that we want to add or remove from the 
data nodes,
+     * to schedule and stop scale up and scale down processes.
+     */
+    static class ZoneState {
+        /** Schedule task for a scale up process. */
+        private ScheduledFuture<?> scaleUpTask;
+
+        /** Schedule task for a scale down process. */
+        private ScheduledFuture<?> scaleDownTask;
+
+        /**
+         * Map that stores pairs revision -> {@link Augmentation} for a zone. 
With this map we can track which nodes
+         * should be added or removed in the processes of scale up or scale 
down. Revision helps to track visibility of the events
+         * of adding or removing nodes because any process of scale up or 
scale down has a revision that triggered this process.
+         */
+        private final ConcurrentSkipListMap<Long, Augmentation> 
topologyAugmentationMap;
+
+        /** Executor for scheduling tasks for scale up and scale down 
processes. */
+        private final ScheduledExecutorService executor;
+
+        /**
+         * Constructor.
+         *
+         * @param executor Executor for scheduling tasks for scale up and 
scale down processes.
+         */
+        ZoneState(ScheduledExecutorService executor) {
+            this.executor = executor;
+            topologyAugmentationMap = new ConcurrentSkipListMap<>();
+        }
+
+        /**
+         * Map that stores pairs revision -> {@link Augmentation} for a zone. 
With this map we can track which nodes
+         * should be added or removed in the processes of scale up or scale 
down. Revision helps to track visibility of the events
+         * of adding or removing nodes because any process of scale up or 
scale down has a revision that triggered this process.
+         */
+        ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap() {
+            return topologyAugmentationMap;
+        }
+
+        /**
+         * Reschedules existing scale up task, if it is not started yet, or 
schedules new one, if the current task cannot be canceled.
+         *
+         * @param delay Delay to start runnable in seconds.
+         * @param runnable Custom logic to run.
+         */
+        synchronized void rescheduleScaleUp(long delay, Runnable runnable) {
+            if (scaleUpTask != null) {
+                scaleUpTask.cancel(false);
+            }
+
+            scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
+        }
+
+        /**
+         * Reschedules existing scale down task, if it is not started yet, or 
schedules new one, if the current task cannot be canceled.
+         *
+         * @param delay Delay to start runnable in seconds.
+         * @param runnable Custom logic to run.
+         */
+        synchronized void rescheduleScaleDown(long delay, Runnable runnable) {
+            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            if (scaleDownTask != null) {
+                scaleDownTask.cancel(false);
+            }
+
+            scaleDownTask = executor.schedule(runnable, delay, 
TimeUnit.SECONDS);
+        }
+
+        /**
+         * Cancels task for scale up and scale down.
+         */
+        synchronized void stopTimers() {
+            if (scaleUpTask != null) {
+                scaleUpTask.cancel(false);
+            }
+
+            if (scaleDownTask != null) {
+                scaleDownTask.cancel(false);
+            }
+        }
+
+        /**
+         * Returns a set of nodes that should be added to zone's data nodes.
+         *
+         * @param scaleUpRevision Last revision of the scale up event.
+         * @param scaleDownRevision Last revision of the scale down event.
+         * @param revision Revision of the event for which this data nodes is 
needed.
+         * @return Set of nodes that should be added to zone's data nodes.
+         */
+        Set<String> nodesToAddToDataNodes(long scaleUpRevision, long 
scaleDownRevision, long revision) {
+            Long toKey = topologyAugmentationMap.floorKey(revision);
+
+            if (toKey == null) {
+                return Set.of();
+            }
+
+            Set<String> nodesToAdd = accumulateNodes(scaleUpRevision, toKey, 
true);
+
+            Set<String> nodesToRemove = accumulateNodes(scaleUpRevision, 
Math.max(toKey, scaleDownRevision), false);
+
+            nodesToAdd.removeAll(nodesToRemove);
+
+            return nodesToAdd;
+        }
+
+        public Set<String> nodesToRemoveFromDataNodes() {
+            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+            return null;
+        }
+
+        /**
+         * Add nodes to the map where nodes that must be added to the zone's 
data nodes are accumulated.
+         *
+         * @param nodes Nodes to add to zone's data nodes.
+         * @param revision Revision of the event that triggered this addition.
+         */
+        void addNodesToDataNodes(Set<String> nodes, long revision) {
+            topologyAugmentationMap.put(revision, new Augmentation(nodes, 
true));
+        }
+
+        /**
+         * Add nodes to the map where nodes that must be removed from the 
zone's data nodes are accumulated.
+         *
+         * @param nodes Nodes to remove from zone's data nodes.
+         * @param revision Revision of the event that triggered this addition.
+         */
+        void removeNodesFromDataNodes(Set<String> nodes, long revision) {
+            topologyAugmentationMap.put(revision, new Augmentation(nodes, 
true));
+        }
+
+        /**
+         * Accumulate nodes from the {@link ZoneState#topologyAugmentationMap} 
starting from the {@code fromKey} (including)
+         * to the {@code toKey} (including), where flag {@code addition} 
indicates whether we should accumulate nodes that should be
+         * added to data nodes, or removed.
+         *
+         * @param fromKey Starting key (including).
+         * @param toKey Ending key (including).
+         * @param addition Indicates whether we should accumulate nodes that 
should be added to data nodes, or removed.
+         * @return Accumulated nodes.
+         */
+        private Set<String> accumulateNodes(long fromKey, long toKey, boolean 
addition) {
+            return topologyAugmentationMap.subMap(fromKey, true, toKey, 
true).values()
+                    .stream()
+                    .filter(a -> a.addition == addition)
+                    .flatMap(a -> a.nodeNames.stream())
+                    .collect(toSet());
+        }
+
+        private void cleanUp(long toKey) {
+            //TODO: IGNITE-18132 Create scale down scheduler with 
dataNodesAutoAdjustScaleDown timer.
+        }
+    }
+
+    /**
+     * Class stores the info about nodes that should be added or removed from 
the data nodes of a zone.
+     * With flag {@code addition} we can track whether {@code nodeNames} 
should be added or removed.
+     */
+    private static class Augmentation {
+        /** Names of the node. */
+        Set<String> nodeNames;
+
+        /** Flag that indicates whether {@code nodeNames} should be added or 
removed. */
+        boolean addition;
+
+        Augmentation(Set<String> nodeNames, boolean addition) {
+            this.nodeNames = nodeNames;
+            this.addition = addition;
+        }
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 3d74a0ea79..335eaa005e 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static org.apache.ignite.internal.metastorage.dsl.CompoundCondition.and;
 import static org.apache.ignite.internal.metastorage.dsl.CompoundCondition.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -37,14 +38,20 @@ class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zone's scale up change trigger key. */
+    private static final String 
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX = 
"distributionZone.scaleUp.change.trigger.";
+
+    /** Key prefix for zone's scale down change trigger key. */
+    private static final String 
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX = 
"distributionZone.scaleDown.change.trigger.";
+
     /** Key prefix for zones' logical topology nodes. */
     private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
"distributionZones.logicalTopology";
 
     /** Key prefix for zones' logical topology version. */
     private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION = 
"distributionZones.logicalTopologyVersion";
 
-    /** The key, needed for processing the event about zones' update was 
triggered only once. */
-    private static final ByteArray DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY = new 
ByteArray("distributionZones.change.trigger");
+    /** Key prefix, needed for processing the event about zone's update was 
triggered only once. */
+    private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX = 
"distributionZones.change.trigger.";
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
     private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY = 
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
@@ -59,10 +66,27 @@ class DistributionZonesUtil {
     }
 
     /**
-     * The key, needed for processing the event about zones' update was 
triggered only once.
+     * The key needed for processing an event about zone's creation and 
deletion.
+     * With this key we can be sure that event was triggered only once.
+     */
+    static ByteArray zonesChangeTriggerKey(int zoneId) {
+        return new ByteArray(DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX + 
zoneId);
+    }
+
+    /**
+     * The key needed for processing an event about zone's data node 
propagation on scale up.
+     * With this key we can be sure that event was triggered only once.
+     */
+    static ByteArray zoneScaleUpChangeTriggerKey(int zoneId) {
+        return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX 
+ zoneId);
+    }
+
+    /**
+     * The key needed for processing an event about zone's data node 
propagation on scale down.
+     * With this key we can be sure that event was triggered only once.
      */
-    static ByteArray zonesChangeTriggerKey() {
-        return DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY;
+    static ByteArray zoneScaleDownChangeTriggerKey(int zoneId) {
+        return new 
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX + zoneId);
     }
 
     /**
@@ -82,69 +106,97 @@ class DistributionZonesUtil {
     }
 
     /**
-     * Condition for updating {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} key.
+     * Condition for updating {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
      * Update only if the revision of the event is newer than value in that 
trigger key.
      *
      * @param revision Event revision.
      * @return Update condition.
      */
-    static CompoundCondition triggerKeyCondition(long revision) {
+    static CompoundCondition triggerKeyConditionForZonesChanges(long revision, 
int zoneId) {
         return or(
-                notExists(zonesChangeTriggerKey()),
-                
value(zonesChangeTriggerKey()).lt(ByteUtils.longToBytes(revision))
+                notExists(zonesChangeTriggerKey(zoneId)),
+                
value(zonesChangeTriggerKey(zoneId)).lt(ByteUtils.longToBytes(revision))
         );
     }
 
     /**
-     * Updates data nodes value for a zone and set {@code revision} to {@link 
DistributionZonesUtil#zonesChangeTriggerKey()}.
+     * Condition for updating {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
+     * Update only if the revision of the event is newer than value in that 
trigger key.
+     *
+     * @param scaleUpTriggerRevision Trigger revision of scale up.
+     * @param scaleDownTriggerRevision Trigger revision of scale down.
+     * @param zoneId Zone id.
+     * @return Update condition.
+     */
+    static CompoundCondition triggerScaleUpScaleDownKeysCondition(long 
scaleUpTriggerRevision, long scaleDownTriggerRevision,  int zoneId) {
+        return and(
+                
value(zoneScaleUpChangeTriggerKey(zoneId)).eq(ByteUtils.longToBytes(scaleUpTriggerRevision)),
+                
value(zoneScaleDownChangeTriggerKey(zoneId)).eq(ByteUtils.longToBytes(scaleDownTriggerRevision))
+        );
+    }
+
+    /**
+     * Updates data nodes value for a zone and set {@code revision} to {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}.
      *
      * @param zoneId Distribution zone id
      * @param revision Revision of the event.
-     * @param logicalTopology Logical topology.
+     * @param nodes Data nodes.
      * @return Update command for the meta storage.
      */
-    static Update updateDataNodesAndTriggerKey(int zoneId, long revision, 
byte[] logicalTopology) {
+    static Update updateDataNodesAndScaleUpTriggerKey(int zoneId, long 
revision, byte[] nodes) {
         return ops(
-                put(zoneDataNodesKey(zoneId), logicalTopology),
-                put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
+                put(zoneDataNodesKey(zoneId), nodes),
+                put(zoneScaleUpChangeTriggerKey(zoneId), 
ByteUtils.longToBytes(revision))
         ).yield(true);
     }
 
     /**
-     * Updates logical topology and logical topology version values for zones.
+     * Updates data nodes value for a zone and set {@code revision} to {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)},
+     * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and 
{@link DistributionZonesUtil#zonesChangeTriggerKey(int)}.
      *
-     * @param logicalTopology Logical topology.
-     * @param topologyVersion Logical topology version.
+     * @param zoneId Distribution zone id
+     * @param revision Revision of the event.
+     * @param nodes Data nodes.
      * @return Update command for the meta storage.
      */
-    static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology, 
long topologyVersion) {
+    static Update updateDataNodesAndTriggerKeys(int zoneId, long revision, 
byte[] nodes) {
         return ops(
-                put(zonesLogicalTopologyVersionKey(), 
ByteUtils.longToBytes(topologyVersion)),
-                put(zonesLogicalTopologyKey(), 
ByteUtils.toBytes(logicalTopology))
+                put(zoneDataNodesKey(zoneId), nodes),
+                put(zoneScaleUpChangeTriggerKey(zoneId), 
ByteUtils.longToBytes(revision)),
+                put(zoneScaleDownChangeTriggerKey(zoneId), 
ByteUtils.longToBytes(revision)),
+                put(zonesChangeTriggerKey(zoneId), 
ByteUtils.longToBytes(revision))
         ).yield(true);
     }
 
     /**
-     * Sets {@code revision} to {@link 
DistributionZonesUtil#zonesChangeTriggerKey()}.
+     * Deletes data nodes, {@link 
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)},
+     * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} values 
for a zone. Also sets {@code revision} to
+     * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)}.
      *
+     * @param zoneId Distribution zone id
      * @param revision Revision of the event.
      * @return Update command for the meta storage.
      */
-    static Update updateTriggerKey(long revision) {
-        return ops(put(zonesChangeTriggerKey(), 
ByteUtils.longToBytes(revision))).yield(true);
+    static Update deleteDataNodesAndUpdateTriggerKeys(int zoneId, long 
revision) {
+        return ops(
+                remove(zoneDataNodesKey(zoneId)),
+                remove(zoneScaleUpChangeTriggerKey(zoneId)),
+                remove(zoneScaleDownChangeTriggerKey(zoneId)),
+                put(zonesChangeTriggerKey(zoneId), 
ByteUtils.longToBytes(revision))
+        ).yield(true);
     }
 
     /**
-     * Deletes data nodes value for a zone and set {@code revision} to {@link 
DistributionZonesUtil#zonesChangeTriggerKey()}.
+     * Updates logical topology and logical topology version values for zones.
      *
-     * @param zoneId Distribution zone id
-     * @param revision Revision of the event.
+     * @param logicalTopology Logical topology.
+     * @param topologyVersion Logical topology version.
      * @return Update command for the meta storage.
      */
-    static Update deleteDataNodesKeyAndUpdateTriggerKey(int zoneId, long 
revision) {
+    static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology, 
long topologyVersion) {
         return ops(
-                remove(zoneDataNodesKey(zoneId)),
-                put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
+                put(zonesLogicalTopologyVersionKey(), 
ByteUtils.longToBytes(topologyVersion)),
+                put(zonesLogicalTopologyKey(), 
ByteUtils.toBytes(logicalTopology))
         ).yield(true);
     }
 }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index ba1dea44be..d05c7165a2 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.distributionzones;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
@@ -41,10 +41,13 @@ import static org.mockito.Mockito.when;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.NamedListView;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
@@ -52,15 +55,21 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
 import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
 import org.apache.ignite.internal.metastorage.dsl.If;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -72,6 +81,7 @@ import 
org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -138,7 +148,8 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
                 tablesConfiguration,
                 metaStorageManager,
                 logicalTopologyService,
-                vaultMgr
+                vaultMgr,
+                "node"
         );
 
         clusterCfgMgr.start();
@@ -200,7 +211,49 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
                     return res;
                 }
-        ).when(metaStorageService).run(any());
+        ).when(metaStorageService).run(any(WriteCommand.class));
+
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public ReadCommand command() {
+                            return (ReadCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onRead(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(ReadCommand.class));
 
         MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
 
@@ -211,6 +264,28 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
             return metaStorageService.run(multiInvokeCommand).thenApply(bi -> 
new StatementResult(((StatementResultInfo) bi).result()));
         }).when(metaStorageManager).invoke(any());
+
+        lenient().doAnswer(invocationClose -> {
+            Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+            GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+                    
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+            ).revision(0).build();
+
+            return metaStorageService.run(getAllCommand).thenApply(bi -> {
+                MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+                Map<ByteArray, Entry> res = new HashMap<>();
+
+                for (SingleEntryResponse e : resp.entries()) {
+                    ByteArray key = new ByteArray(e.key());
+
+                    res.put(key, new EntryImpl(key.bytes(), e.value(), 
e.revision(), e.updateCounter()));
+                }
+
+                return res;
+            });
+        }).when(metaStorageManager).getAll(any());
     }
 
     @AfterEach
@@ -230,36 +305,9 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         assertDataNodesForZone(1, nodes);
 
-        assertZonesChangeTriggerKey(1);
-    }
+        assertZoneScaleUpChangeTriggerKey(1, 1);
 
-    @Test
-    void testTriggerKeyPropagationAfterDefaultZoneUpdate() throws Exception {
-        testTriggerKeyPropagationAfterZoneUpdate(DEFAULT_ZONE_NAME);
-    }
-
-    @Test
-    void testTriggerKeyPropagationAfterNotDefaultZoneUpdate() throws Exception 
{
-        testTriggerKeyPropagationAfterZoneUpdate(ZONE_NAME);
-    }
-
-    void testTriggerKeyPropagationAfterZoneUpdate(String zoneName) throws 
Exception {
-        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
-
-        int triggerKey = 0;
-
-        if (!DEFAULT_ZONE_NAME.equals(zoneName)) {
-            distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(zoneName).build()).get();
-
-            assertZonesChangeTriggerKey(++triggerKey);
-        }
-
-        distributionZoneManager.alterZone(
-                zoneName,
-                new 
DistributionZoneConfigurationParameters.Builder(zoneName).dataNodesAutoAdjust(100).build()
-        ).get();
-
-        assertZonesChangeTriggerKey(++triggerKey);
+        assertZonesChangeTriggerKey(1, 1);
     }
 
     @Test
@@ -275,74 +323,39 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     @Test
     void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
-        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
+        
assertNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(1).bytes()).value());
+        
assertNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(2).bytes()).value());
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME).build()).get();
 
-        assertZonesChangeTriggerKey(2);
-    }
-
-    @Test
-    void testSeveralZoneUpdatesUpdatesTriggerKey() throws Exception {
-        
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
-
-        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
-
-        distributionZoneManager.alterZone(
-                ZONE_NAME,
-                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-        ).get();
-
-        distributionZoneManager.alterZone(
-                ZONE_NAME,
-                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(1000).build()
-        ).get();
-
-        assertZonesChangeTriggerKey(3);
+        assertZoneScaleUpChangeTriggerKey(1, 1);
+        assertZoneScaleUpChangeTriggerKey(2, 2);
+        assertZonesChangeTriggerKey(1, 1);
+        assertZonesChangeTriggerKey(2, 2);
     }
 
     @Test
     void testDataNodesNotPropagatedAfterZoneCreation() throws Exception {
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey(1).bytes(), 
longToBytes(100));
 
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
         verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
 
-        assertZonesChangeTriggerKey(100);
+        assertZonesChangeTriggerKey(100, 1);
 
         assertDataNodesForZone(1, null);
     }
 
-    @Test
-    void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
-        distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
-
-        assertDataNodesForZone(1, nodes);
-
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
-
-        distributionZoneManager.alterZone(
-                ZONE_NAME,
-                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-        ).get();
-
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
-
-        assertZonesChangeTriggerKey(100);
-
-        assertDataNodesForZone(1, nodes);
-    }
-
     @Test
     void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
         distributionZoneManager.createZone(new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
         assertDataNodesForZone(1, nodes);
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey(1).bytes(), 
longToBytes(100));
 
         distributionZoneManager.dropZone(ZONE_NAME).get();
 
@@ -361,14 +374,22 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
     private void assertDataNodesForZone(int zoneId, @Nullable Set<String> 
clusterNodes) throws InterruptedException {
         byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
 
-        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), 
nodes),
-                1000));
+        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), 
nodes), 1000));
+    }
+
+    private void assertZoneScaleUpChangeTriggerKey(int revision, int zoneId) 
throws InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zoneScaleUpChangeTriggerKey(zoneId).bytes()).value())
 == revision,
+                        2000
+                )
+        );
     }
 
-    private void assertZonesChangeTriggerKey(int revision) throws 
InterruptedException {
+    private void assertZonesChangeTriggerKey(int revision, int zoneId) throws 
InterruptedException {
         assertTrue(
                 waitForCondition(
-                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value())
 == revision, 1000
+                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey(zoneId).bytes()).value())
 == revision, 1000
                 )
         );
     }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index de897b0708..80f90f754f 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -146,7 +146,8 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest {
                 tablesConfiguration,
                 metaStorageManager,
                 logicalTopologyService,
-                vaultMgr
+                vaultMgr,
+                "node"
         );
 
         clusterCfgMgr.start();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
new file mode 100644
index 0000000000..e5cd7a8f13
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetCommand;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.dsl.If;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Test scenarios for the distribution zone scale up.
+ */
+public class DistributionZoneManagerScaleUpTest {
+    private static final String ZONE_NAME = "zone1";
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    @Mock
+    private LogicalTopologyServiceImpl logicalTopologyService;
+
+    private LogicalTopology topology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private VaultManager vaultMgr;
+
+    private MetaStorageManager metaStorageManager;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        topology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(topology, cmgManager);
+
+        vaultMgr = mock(VaultManager.class);
+
+        TablesConfiguration tablesConfiguration = 
mock(TablesConfiguration.class);
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> 
tables = mock(NamedConfigurationTree.class);
+
+        when(tablesConfiguration.tables()).thenReturn(tables);
+
+        NamedListView<TableView> value = mock(NamedListView.class);
+
+        when(tables.value()).thenReturn(value);
+
+        when(value.namedListKeys()).thenReturn(new ArrayList<>());
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultMgr,
+                "node"
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), null)));
+        when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
+
+        doAnswer(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return null;
+        }).when(metaStorageManager).registerExactWatch(any(), any());
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(WriteCommand.class));
+
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public ReadCommand command() {
+                            return (ReadCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onRead(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(ReadCommand.class));
+
+        MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, 
commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> 
new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+
+        lenient().doAnswer(invocationClose -> {
+            Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+            GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+                    
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+            ).revision(0).build();
+
+            return metaStorageService.run(getAllCommand).thenApply(bi -> {
+                MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+                Map<ByteArray, Entry> res = new HashMap<>();
+
+                for (SingleEntryResponse e : resp.entries()) {
+                    ByteArray key = new ByteArray(e.key());
+
+                    res.put(key, new EntryImpl(key.bytes(), e.value(), 
e.revision(), e.updateCounter()));
+                }
+
+                return res;
+            });
+        }).when(metaStorageManager).getAll(any());
+
+        lenient().doAnswer(invocationClose -> {
+            ByteArray key = invocationClose.getArgument(0);
+
+            GetCommand getCommand = 
commandsFactory.getCommand().key(key.bytes()).build();
+
+            return metaStorageService.run(getCommand).thenApply(bi -> {
+                SingleEntryResponse resp = (SingleEntryResponse) bi;
+
+                return new EntryImpl(resp.key(), resp.value(), 
resp.revision(), resp.updateCounter());
+            });
+        }).when(metaStorageManager).get(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+
+        clusterStateStorage.destroy();
+    }
+
+    @Test
+    void testDataNodesPropagationAfterScaleUpTriggered() throws Exception {
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager.start();
+
+        assertDataNodesForZone(DEFAULT_ZONE_ID, 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node2);
+
+        Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+        mockCmgLocalNodes(1L, clusterNodes2);
+
+        assertLogicalTopology(clusterNodes2);
+
+        distributionZoneManager.createZone(
+                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(1).build()
+        ).get();
+
+        assertDataNodesForZone(1, 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        assertZoneScaleUpChangeTriggerKey(1, 1);
+
+        
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
 2);
+
+        assertDataNodesForZone(1, 
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    }
+
+    @Test
+    void testDataNodesPropagationForDefaultZoneAfterScaleUpTriggered() throws 
Exception {
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager.start();
+
+        assertDataNodesForZone(DEFAULT_ZONE_ID, 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node2);
+
+        Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+        mockCmgLocalNodes(1L, clusterNodes2);
+
+        assertLogicalTopology(clusterNodes2);
+
+        distributionZoneManager.alterZone(
+                DEFAULT_ZONE_NAME,
+                new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+        ).get();
+
+        
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
 2);
+
+        assertDataNodesForZone(DEFAULT_ZONE_ID, 
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    }
+
+    @Test
+    void testDropZoneDoNotPropagateDataNodesAfterScaleUp() throws Exception {
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager.start();
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node2);
+
+        Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+        mockCmgLocalNodes(1L, clusterNodes2);
+
+        assertLogicalTopology(clusterNodes2);
+
+        distributionZoneManager.createZone(
+                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(1).build()
+        ).get();
+
+        assertDataNodesForZone(1, 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        assertZoneScaleUpChangeTriggerKey(1, 1);
+
+        
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
 2);
+
+        distributionZoneManager.dropZone(ZONE_NAME).get();
+
+        assertNotEqualsDataNodesForZone(1, 
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    }
+
+    @Test
+    void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception {
+        preparePrerequisites();
+
+        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                zonesConfiguration.distributionZones();
+
+        DistributionZoneView zoneView = zones.value().get(0);
+
+        CountDownLatch in1 = new CountDownLatch(1);
+        CountDownLatch in2 = new CountDownLatch(1);
+        CountDownLatch out1 = new CountDownLatch(1);
+        CountDownLatch out2 = new CountDownLatch(1);
+
+        distributionZoneManager.scheduleTimers(
+                zoneView,
+                Set.of("B"),
+                Set.of(),
+                2,
+                (zoneId, revision) -> {
+                    try {
+                        in1.await();
+                    } catch (InterruptedException e) {
+                        fail();
+                    }
+
+                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(out1::countDown);
+                }
+        );
+
+        // Assert that first task was run and event about adding node "B" with 
revision 2 was added
+        // to the topologyAugmentationMap of the zone.
+        assertTrue(
+                waitForCondition(
+                        () -> 
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(2L),
+                        1000
+                )
+        );
+
+        distributionZoneManager.scheduleTimers(
+                zoneView,
+                Set.of("C"),
+                Set.of(),
+                3,
+                (zoneId, revision) -> {
+                    try {
+                        in2.await();
+                    } catch (InterruptedException e) {
+                        fail();
+                    }
+
+                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(() -> {
+                        try {
+                            out2.await();
+                        } catch (InterruptedException e) {
+                            fail();
+                        }
+                    });
+                }
+        );
+
+        // Assert that second task was run and event about adding node "C" 
with revision 3 was added
+        // to the topologyAugmentationMap of the zone.
+        assertTrue(
+                waitForCondition(
+                        () -> 
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(3L),
+                        1000
+                )
+        );
+
+
+        //Second task is propagating data nodes first.
+        in2.countDown();
+
+        assertZoneScaleUpChangeTriggerKey(3, 1);
+
+        assertDataNodesForZone(1, Set.of("A", "B", "C"));
+
+        out2.countDown();
+
+        in1.countDown();
+
+        //Waiting for the first scheduler ends it work.
+        out1.countDown();
+
+        // Assert that nothing has been changed.
+        assertZoneScaleUpChangeTriggerKey(3, 1);
+
+        assertDataNodesForZone(1, Set.of("A", "B", "C"));
+    }
+
+    @Test
+    void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception {
+        preparePrerequisites();
+
+        NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
+                zonesConfiguration.distributionZones();
+
+        DistributionZoneView zoneView = zones.value().get(0);
+
+        CountDownLatch in1 = new CountDownLatch(1);
+        CountDownLatch in2 = new CountDownLatch(1);
+        CountDownLatch out1 = new CountDownLatch(1);
+
+        distributionZoneManager.scheduleTimers(
+                zoneView,
+                Set.of("B"),
+                Set.of(),
+                2,
+                (zoneId, revision) -> {
+                    in1.countDown();
+
+                    return testSaveDataNodesOnScaleUp(zoneId, 
revision).thenRun(() -> {
+                        try {
+                            out1.await();
+                        } catch (InterruptedException e) {
+                            fail();
+                        }
+                    });
+                }
+        );
+
+        // Waiting for the first task to be run. We have to do that to be sure 
that watch events,
+        // which we try to emulate, are handled sequentially.
+        in1.await();
+
+        distributionZoneManager.scheduleTimers(
+                zoneView,
+                Set.of("C"),
+                Set.of(),
+                3,
+                (zoneId, revision) -> {
+                    try {
+                        in2.await();
+                    } catch (InterruptedException e) {
+                        fail();
+                    }
+
+                    return testSaveDataNodesOnScaleUp(zoneId, revision);
+                }
+        );
+
+        // Assert that second task was run and event about adding node "C" 
with revision 3 was added
+        // to the topologyAugmentationMap of the zone.
+        assertTrue(
+                waitForCondition(
+                        () -> 
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(3L),
+                        1000
+                )
+        );
+
+        assertZoneScaleUpChangeTriggerKey(2, 1);
+
+        assertDataNodesForZone(1, Set.of("A", "B"));
+
+        // Second task is run and we await that data nodes will be changed 
from ["A", "B"] to ["A", "B", "C"]
+        in2.countDown();
+
+        assertZoneScaleUpChangeTriggerKey(3, 1);
+
+        assertDataNodesForZone(1, Set.of("A", "B", "C"));
+
+        out1.countDown();
+    }
+
+
+
+    /**
+     * Creates a zone with the auto adjust scale up trigger equals to 0 and 
the data nodes equals ["A"].
+     *
+     * @throws Exception when something goes wrong.
+     */
+    private void preparePrerequisites() throws Exception {
+        ClusterNode node1 = new ClusterNode("1", "A", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager.start();
+
+        distributionZoneManager.createZone(
+                new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+        ).get();
+
+        assertDataNodesForZone(1, 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        assertZoneScaleUpChangeTriggerKey(1, 1);
+    }
+
+    private CompletableFuture<Void> testSaveDataNodesOnScaleUp(int zoneId, 
long revision) {
+        return 
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+    }
+
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<String> 
clusterNodes) throws InterruptedException {
+        assertTrue(waitForCondition(
+                () -> {
+                    byte[] dataNodes = 
keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
+
+                    if (dataNodes == null) {
+                        return clusterNodes == null;
+                    }
+
+                    Set<String> res = ByteUtils.fromBytes(dataNodes);
+
+                    return res.equals(clusterNodes);
+                },
+                2000
+        ));
+    }
+
+    private void assertNotEqualsDataNodesForZone(int zoneId, @Nullable 
Set<String> clusterNodes) throws InterruptedException {
+        assertFalse(waitForCondition(
+                () -> {
+                    byte[] dataNodes = 
keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
+
+                    if (dataNodes == null) {
+                        return clusterNodes == null;
+                    }
+
+                    Set<String> res = ByteUtils.fromBytes(dataNodes);
+
+                    return res.equals(clusterNodes);
+                },
+                2000
+        ));
+    }
+
+    private void assertZoneScaleUpChangeTriggerKey(int revision, int zoneId) 
throws InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zoneScaleUpChangeTriggerKey(zoneId).bytes()).value())
 == revision,
+                        2000
+                )
+        );
+    }
+
+    private void assertZonesChangeTriggerKey(int revision, int zoneId) throws 
InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey(zoneId).bytes()).value())
 == revision, 1000
+                )
+        );
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(metaStorageManager.appliedRevision()).thenReturn(revision);
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(), 
newLogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private LogicalTopologySnapshot mockCmgLocalNodes(long version, 
Set<ClusterNode> clusterNodes) {
+        LogicalTopologySnapshot logicalTopologySnapshot = new 
LogicalTopologySnapshot(version, clusterNodes);
+
+        
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+        return logicalTopologySnapshot;
+    }
+
+    private void assertLogicalTopology(@Nullable Set<ClusterNode> 
clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null
+                ? null
+                : 
toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zonesLogicalTopologyKey().bytes()).value(), 
nodes), 1000));
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new 
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index 0b0a19086d..7eada92853 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -103,7 +103,8 @@ class DistributionZoneManagerTest extends 
IgniteAbstractTest {
                 tablesConfiguration,
                 null,
                 null,
-                null
+                null,
+                "node"
         );
     }
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index e36870e966..80dc99b4a8 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -22,9 +22,11 @@ import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTR
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
@@ -44,11 +46,15 @@ import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.NamedListView;
@@ -66,8 +72,11 @@ import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
 import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
 import org.apache.ignite.internal.metastorage.dsl.If;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -75,6 +84,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -85,6 +95,7 @@ import 
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -154,7 +165,8 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
                 tablesConfiguration,
                 metaStorageManager,
                 logicalTopologyService,
-                vaultMgr
+                vaultMgr,
+                "node"
         );
 
         clusterCfgMgr.start();
@@ -223,7 +235,49 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
                     return res;
                 }
-        ).when(metaStorageService).run(any());
+        ).when(metaStorageService).run(any(WriteCommand.class));
+
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public ReadCommand command() {
+                            return (ReadCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onRead(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(ReadCommand.class));
 
         MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
 
@@ -234,6 +288,28 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
             return metaStorageService.run(multiInvokeCommand).thenApply(bi -> 
new StatementResult(((StatementResultInfo) bi).result()));
         }).when(metaStorageManager).invoke(any());
+
+        lenient().doAnswer(invocationClose -> {
+            Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+            GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+                    
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+            ).revision(0).build();
+
+            return metaStorageService.run(getAllCommand).thenApply(bi -> {
+                MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+                Map<ByteArray, org.apache.ignite.internal.metastorage.Entry> 
res = new HashMap<>();
+
+                for (SingleEntryResponse e : resp.entries()) {
+                    ByteArray key = new ByteArray(e.key());
+
+                    res.put(key, new EntryImpl(key.bytes(), e.value(), 
e.revision(), e.updateCounter()));
+                }
+
+                return res;
+            });
+        }).when(metaStorageManager).getAll(any());
     }
 
     @AfterEach
@@ -271,6 +347,9 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
         verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
 
+        nodes = Set.of("node1", "node2", "node3");
+
+        // Scale up just adds node to data nodes
         checkDataNodesOfZone(DEFAULT_ZONE_ID, nodes);
 
         //third event
@@ -279,22 +358,33 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
 
         watchListenerOnUpdate(nodes, 4);
 
-        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        nodes = Set.of("node1", "node2", "node3");
 
+        // Scale up wasn't triggered
         checkDataNodesOfZone(DEFAULT_ZONE_ID, nodes);
     }
 
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<String> 
clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
+
+        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), 
nodes), 1000));
+    }
+
     @Test
     void testStaleWatchEvent() {
         mockVaultZonesLogicalTopologyKey(Set.of());
 
+        mockZones(mockZoneWithAutoAdjustScaleUp(100));
+
         distributionZoneManager.start();
 
         mockVaultAppliedRevision(1);
 
         long revision = 100;
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
longToBytes(revision));
+        keyValueStorage.put(zoneScaleUpChangeTriggerKey(1).bytes(), 
longToBytes(revision));
 
         Set<String> nodes = Set.of("node1", "node2");
 
@@ -309,7 +399,7 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
     void testStaleVaultRevisionOnZoneManagerStart() {
         long revision = 100;
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), 
longToBytes(revision));
+        keyValueStorage.put(zonesChangeTriggerKey(0).bytes(), 
longToBytes(revision));
 
         Set<String> nodes = Set.of("node1", "node2");
 
@@ -466,7 +556,7 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
     }
 
     private DistributionZoneView mockDefaultZoneView() {
-        DistributionZoneView defaultZone = mockZoneView(DEFAULT_ZONE_ID, 
DEFAULT_ZONE_NAME, 100, Integer.MAX_VALUE,
+        DistributionZoneView defaultZone = mockZoneView(DEFAULT_ZONE_ID, 
DEFAULT_ZONE_NAME, Integer.MAX_VALUE, 0,
                 Integer.MAX_VALUE);
 
         DistributionZonesView zonesView = mock(DistributionZonesView.class);
@@ -477,6 +567,11 @@ public class DistributionZoneManagerWatchListenerTest 
extends IgniteAbstractTest
         return defaultZone;
     }
 
+    private DistributionZoneConfiguration mockZoneWithAutoAdjustScaleUp(int 
scaleUp) {
+        return mockZoneConfiguration(1, ZONE_NAME_1, Integer.MAX_VALUE, 
scaleUp, Integer.MAX_VALUE);
+    }
+
+
     private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
         byte[] newLogicalTopology = toBytes(nodes);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
new file mode 100644
index 0000000000..2e14cc044e
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test scenarios for the distribution zone schedulers.
+ */
+public class DistributionZonesSchedulersTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DistributionZonesSchedulersTest.class);
+
+    private static final ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(
+            Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+            new NamedThreadFactory("test-dst-zones-scheduler", LOG),
+            new ThreadPoolExecutor.DiscardPolicy()
+    );
+
+    @AfterAll
+    public static void afterAll() {
+        shutdownAndAwaitTermination(executor, 2, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testScaleUpSchedule() throws InterruptedException {
+        ZoneState state = new ZoneState(executor);
+
+        testSchedule(state::rescheduleScaleUp);
+    }
+
+    @Test
+    void testScaleDownSchedule() throws InterruptedException {
+        ZoneState state = new ZoneState(executor);
+
+        testSchedule(state::rescheduleScaleDown);
+    }
+
+    private static void testSchedule(BiConsumer<Long, Runnable> fn) throws 
InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        assertEquals(1L, latch.getCount());
+
+        fn.accept(0L, latch::countDown);
+
+        latch.await();
+
+        assertEquals(0L, latch.getCount());
+    }
+
+    @Test
+    void testScaleUpReScheduleNotStartedTask() {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testReScheduleNotStartedTask(state::rescheduleScaleUp);
+    }
+
+    @Test
+    void testScaleDownReScheduleNotStartedTask() {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testReScheduleNotStartedTask(state::rescheduleScaleDown);
+    }
+
+    private static void testReScheduleNotStartedTask(BiConsumer<Long, 
Runnable> fn) {
+        Runnable runnable = mock(Runnable.class);
+
+        fn.accept(1L, runnable);
+
+        fn.accept(0L, runnable);
+
+        verify(runnable, after(1200).times(1)).run();
+    }
+
+    @Test
+    void testScaleUpReScheduleWhenTaskIsEnded() throws InterruptedException {
+        ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+        testReScheduleWhenTaskIsEnded(state::rescheduleScaleUp);
+    }
+
+    @Test
+    void testScaleDownReScheduleWhenTaskIsEnded() throws InterruptedException {
+        ZoneState state = new ZoneState(executor);
+
+        testReScheduleWhenTaskIsEnded(state::rescheduleScaleUp);
+    }
+
+    private static void testReScheduleWhenTaskIsEnded(BiConsumer<Long, 
Runnable> fn) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        AtomicBoolean flag = new AtomicBoolean();
+
+        assertEquals(1L, latch.getCount());
+
+        fn.accept(0L, latch::countDown);
+
+        latch.await(1000, TimeUnit.MILLISECONDS);
+
+        fn.accept(0L, () -> {
+            flag.set(true);
+        });
+
+        assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500));
+        assertTrue(waitForCondition(flag::get, 1500));
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6d40dc0e95..1affcc2468 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -400,7 +400,8 @@ public class IgniteImpl implements Ignite {
                 tablesConfiguration,
                 metaStorageMgr,
                 logicalTopologyService,
-                vaultMgr
+                vaultMgr,
+                name
         );
 
         volatileLogStorageFactoryCreator = new 
VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
index 6a61aae43e..e339a97dd6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
@@ -108,7 +108,8 @@ public class DdlCommandHandlerExceptionHandlingTest extends 
IgniteAbstractTest {
                 null,
                 null,
                 null,
-                null
+                null,
+                "node"
         );
 
         commandHandler = new DdlCommandHandler(distributionZoneManager, 
tableManager, indexManager, dataStorageManager);

Reply via email to