This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e302bf2f1f IGNITE-19580 Add restoring of zones state after restart 
(#2253)
e302bf2f1f is described below

commit e302bf2f1f9ca86603135570834c40dad20fcf3b
Author: Mirza Aliev <[email protected]>
AuthorDate: Wed Jun 28 16:53:19 2023 +0400

    IGNITE-19580 Add restoring of zones state after restart (#2253)
---
 .../distributionzones/DistributionZoneManager.java | 273 +++++++++++++++------
 .../distributionzones/DistributionZonesUtil.java   |  25 +-
 .../DistributionZoneManagerScaleUpTest.java        |   4 +-
 .../server/raft/MetaStorageWriteHandler.java       |   4 +-
 ...niteDistributionZoneManagerNodeRestartTest.java | 184 +++++++++++++-
 5 files changed, 398 insertions(+), 92 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index aeaf81936c..5673101862 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -42,6 +42,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentationVault;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -62,8 +63,10 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
 import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -320,15 +323,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             rebalanceEngine.start();
 
-            // Init timers after restart.
-            zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
-
-            zonesConfiguration.distributionZones().value().forEach(zone -> {
-                int zoneId = zone.zoneId();
-
-                zonesState.putIfAbsent(zoneId, new ZoneState(executor));
-            });
-
             logicalTopologyService.addEventListener(topologyEventListener);
 
             
metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(), 
topologyWatchListener);
@@ -336,7 +330,10 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             restoreGlobalStateFromVault();
 
-            initDataNodesFromVaultManager();
+            long appliedRevision = metaStorageManager.appliedRevision();
+
+            // onCreate for default zone is not called, so we have to restore 
it's state on start
+            
createOrRestoreZoneState(zonesConfiguration.defaultDistributionZone().value(), 
appliedRevision);
         } finally {
             busyLock.leaveBusy();
         }
@@ -765,17 +762,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
     private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            int zoneId = ctx.newValue().zoneId();
+            DistributionZoneView zone = ctx.newValue();
 
-            ZoneState zoneState = new ZoneState(executor);
-
-            zonesState.putIfAbsent(zoneId, zoneState);
-
-            saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                    zoneId,
-                    ctx.storageRevision(),
-                    
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
-            );
+            createOrRestoreZoneState(zone, ctx.storageRevision());
 
             return completedFuture(null);
         }
@@ -795,15 +784,85 @@ public class DistributionZoneManager implements 
IgniteComponent {
     }
 
     /**
-     * Method updates data nodes value for the specified zone, also sets 
{@code revision} to the
-     * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link 
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}
-     * and {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it 
passes the condition.
+     * Creates or restores zone's state depending on the {@link 
ZoneState#topologyAugmentationMap()} existence in the Vault.
+     * We save {@link ZoneState#topologyAugmentationMap()} in the Vault every 
time we receive logical topology changes from the metastore.
+     *
+     * @param zone Zone's view.
+     * @param revision Revision for which we restore zone's state.
+     */
+    private void createOrRestoreZoneState(DistributionZoneView zone, long 
revision) {
+        int zoneId = zone.zoneId();
+
+        VaultEntry topologyAugmentationMapFromVault = 
vaultMgr.get(zoneTopologyAugmentationVault(zoneId)).join();
+
+        // First creation of a zone, or first call on the manager start for 
the default zone.
+        if (topologyAugmentationMapFromVault == null) {
+            ZoneState zoneState = new ZoneState(executor);
+
+            ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
+
+            assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
+
+            Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+
+            initDataNodesAndTriggerKeysInMetaStorage(zoneId, revision, 
dataNodes);
+        } else {
+            // Restart case, when topologyAugmentationMap has already been 
saved during a cluster work.
+            ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap 
= fromBytes(topologyAugmentationMapFromVault.value());
+
+            ZoneState zoneState = new ZoneState(executor, 
topologyAugmentationMap);
+
+            VaultEntry dataNodes = 
vaultMgr.get(zoneDataNodesKey(zoneId)).join();
+
+            if (dataNodes != null) {
+                String filter = zone.filter();
+
+                
zoneState.nodes(filterDataNodes(DistributionZonesUtil.dataNodes(fromBytes(dataNodes.value())),
 filter, nodesAttributes()));
+            }
+
+            ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
+
+            assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
+
+            Optional<Long> maxScaleUpRevision = 
zoneState.highestRevision(true);
+
+            // Take the highest revision from the topologyAugmentationMap and 
schedule scale up/scale down,
+            // meaning that all augmentation of nodes will be taken into 
account in newly created timers. If the augmentation has already
+            // been proposed to data nodes in the metastorage before restart,
+            // that means we have updated corresponding trigger key and it's 
value will be greater than
+            // the highest revision from the topologyAugmentationMap, and 
current timer won't affect data nodes.
+            maxScaleUpRevision.ifPresent(
+                    rev -> zoneState.rescheduleScaleUp(
+                            zone.dataNodesAutoAdjustScaleUp(),
+                            () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
rev)
+                    )
+            );
+
+            Optional<Long> maxScaleDownRevision = 
zoneState.highestRevision(false);
+
+            maxScaleDownRevision.ifPresent(
+                    rev -> zoneState.rescheduleScaleDown(
+                            zone.dataNodesAutoAdjustScaleDown(),
+                            () -> 
saveDataNodesToMetaStorageOnScaleDown(zoneId, rev)
+                    )
+            );
+        }
+    }
+
+    /**
+     * Method initialise data nodes value for the specified zone, also sets 
{@code revision} to the
+     * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link 
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
+     * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it passes 
the condition. It is called on the first creation of a zone.
      *
      * @param zoneId Unique id of a zone
      * @param revision Revision of an event that has triggered this method.
      * @param dataNodes Data nodes.
      */
-    private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, 
long revision, Set<Node> dataNodes) {
+    private void initDataNodesAndTriggerKeysInMetaStorage(
+            int zoneId,
+            long revision,
+            Set<Node> dataNodes
+    ) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -819,11 +878,22 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             metaStorageManager.invoke(iif).whenComplete((res, e) -> {
                 if (e != null) {
-                    LOG.error("Failed to update zones' dataNodes value [zoneId 
= {}]", e, zoneId);
+                    LOG.error(
+                            "Failed to update zones' dataNodes value [zoneId = 
{}, dataNodes = {}, revision = {}]",
+                            e,
+                            zoneId,
+                            dataNodes,
+                            revision
+                    );
                 } else if (res.getAsBoolean()) {
-                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}", zoneId, dataNodes);
+                    LOG.debug("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}, revision = {}]", zoneId, dataNodes, revision);
                 } else {
-                    LOG.debug("Failed to update zones' dataNodes value [zoneId 
= {}]", zoneId);
+                    LOG.debug(
+                            "Failed to update zones' dataNodes value [zoneId = 
{}, dataNodes = {}, revision = {}]",
+                            zoneId,
+                            dataNodes,
+                            revision
+                    );
                 }
             });
         } finally {
@@ -849,11 +919,18 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             Iif iif = iif(triggerKeyCondition, removeKeysUpd, 
ops().yield(false));
 
-            metaStorageManager.invoke(iif).thenAccept(res -> {
-                if (res.getAsBoolean()) {
-                    LOG.debug("Delete zones' dataNodes key [zoneId = {}", 
zoneId);
+            metaStorageManager.invoke(iif).whenComplete((res, e) -> {
+                if (e != null) {
+                    LOG.error(
+                            "Failed to delete zone's dataNodes keys [zoneId = 
{}, revision = {}]",
+                            e,
+                            zoneId,
+                            revision
+                    );
+                } else if (res.getAsBoolean()) {
+                    LOG.debug("Delete zone's dataNodes keys [zoneId = {}, 
revision = {}]", zoneId, revision);
                 } else {
-                    LOG.debug("Failed to delete zones' dataNodes key [zoneId = 
{}]", zoneId);
+                    LOG.debug("Failed to delete zone's dataNodes keys [zoneId 
= {}, revision = {}]", zoneId, revision);
                 }
             });
         } finally {
@@ -967,37 +1044,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
     }
 
-    /**
-     * Initialises data nodes of distribution zones in meta storage
-     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
-     */
-    // TODO: will be refactored in 
https://issues.apache.org/jira/browse/IGNITE-19580
-    private void initDataNodesFromVaultManager() {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-        }
-
-        try {
-            long appliedRevision = metaStorageManager.appliedRevision();
-
-            if (!logicalTopology.isEmpty()) {
-                // init keys and data nodes for default zone
-                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                        DEFAULT_ZONE_ID,
-                        appliedRevision,
-                        
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
-                );
-            }
-
-            zonesState.values().forEach(zoneState -> {
-                
zoneState.nodes(logicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet()));
-            });
-
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     /**
      * Restores from vault logical topology and nodes' attributes fields in 
{@link DistributionZoneManager} after restart.
      */
@@ -1092,17 +1138,27 @@ public class DistributionZoneManager implements 
IgniteComponent {
                     NamedConfigurationTree<DistributionZoneConfiguration, 
DistributionZoneView, DistributionZoneChange> zones =
                             zonesConfiguration.distributionZones();
 
+                    Set<Integer> zoneIds = new HashSet<>();
+
                     for (int i = 0; i < zones.value().size(); i++) {
                         DistributionZoneView zoneView = zones.value().get(i);
 
                         scheduleTimers(zoneView, addedNodes, removedNodes, 
revision);
+
+                        zoneIds.add(zoneView.zoneId());
                     }
 
                     DistributionZoneView defaultZoneView = 
zonesConfiguration.value().defaultDistributionZone();
 
                     scheduleTimers(defaultZoneView, addedNodes, removedNodes, 
revision);
 
-                    
updateLogicalTopologyNodeAttributesAndSaveToVault(newLogicalTopology, revision);
+                    zoneIds.add(defaultZoneView.zoneId());
+
+                    newLogicalTopology.forEach(n -> 
nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
+                    logicalTopology = newLogicalTopology;
+
+                    saveStatesToVault(newLogicalTopology, revision, zoneIds);
 
                     return completedFuture(null);
                 } finally {
@@ -1118,23 +1174,24 @@ public class DistributionZoneManager implements 
IgniteComponent {
     }
 
     /**
-     * Updates local representation of logical topology and nodes' attributes 
map and saves it to vault atomically in one batch.
+     * Saves states of the Distribution Zone Manager to vault atomically in 
one batch.
      * After restart it could be used to restore these fields.
      *
      * @param newLogicalTopology Logical topology.
      * @param revision Revision of the event, which triggers update.
+     * @param zoneIds Set of zone id's, whose states will be staved in the 
Vault
      */
-    private void 
updateLogicalTopologyNodeAttributesAndSaveToVault(Set<NodeWithAttributes> 
newLogicalTopology, long revision) {
-        newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), 
n.nodeAttributes()));
-
-        logicalTopology = newLogicalTopology;
-
-        Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(2);
+    private void saveStatesToVault(Set<NodeWithAttributes> newLogicalTopology, 
long revision, Set<Integer> zoneIds) {
+        Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(3 + 
zoneIds.size());
 
         batch.put(zonesLogicalTopologyVault(), toBytes(newLogicalTopology));
 
         batch.put(zonesNodesAttributesVault(), toBytes(nodesAttributes()));
 
+        for (Integer zoneId : zoneIds) {
+            batch.put(zoneTopologyAugmentationVault(zoneId), 
toBytes(zonesState.get(zoneId).topologyAugmentationMap));
+        }
+
         batch.put(zonesGlobalStateRevision(), longToBytes(revision));
 
         vaultMgr.putAll(batch).join();
@@ -1343,6 +1400,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 long scaleDownTriggerRevision = 
extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId)));
 
                 if (revision <= scaleUpTriggerRevision) {
+                    LOG.debug(
+                            "Revision of the event is less than the scale up 
revision from the metastorage "
+                                    + "[zoneId = {}, revision = {}, 
scaleUpTriggerRevision = {}]",
+                            zoneId,
+                            revision,
+                            scaleUpTriggerRevision
+                    );
+
                     return completedFuture(null);
                 }
 
@@ -1373,9 +1438,21 @@ public class DistributionZoneManager implements 
IgniteComponent {
                             if (invokeResult) {
                                 // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
                                 // Currently we call clean up only on a node 
that successfully writes data nodes.
+                                LOG.debug(
+                                        "Updating data nodes for a zone after 
scale up has succeeded "
+                                                + "[zoneId = {}, dataNodes = 
{}, revision = {}]",
+                                        zoneId,
+                                        newDataNodes,
+                                        revision
+                                );
                                 
zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
                             } else {
-                                LOG.debug("Updating data nodes for a zone has 
not succeeded [zoneId = {}]", zoneId);
+                                LOG.debug("Updating data nodes for a zone 
after scale up has not succeeded "
+                                                + "[zoneId = {}, dataNodes = 
{}, revision = {}]",
+                                        zoneId,
+                                        newDataNodes,
+                                        revision
+                                );
 
                                 return 
saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
                             }
@@ -1384,7 +1461,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                         }));
             })).whenComplete((v, e) -> {
                 if (e != null) {
-                    LOG.warn("Failed to update zones' dataNodes value [zoneId 
= {}]", e, zoneId);
+                    LOG.warn("Failed to update zones' dataNodes value after 
scale up [zoneId = {}, revision = {}]", e, zoneId, revision);
                 }
             });
         } finally {
@@ -1432,6 +1509,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 long scaleDownTriggerRevision = 
extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId)));
 
                 if (revision <= scaleDownTriggerRevision) {
+                    LOG.debug(
+                            "Revision of the event is less than the scale down 
revision from the metastorage "
+                                    + "[zoneId = {}, revision = {}, 
scaleUpTriggerRevision = {}]",
+                            zoneId,
+                            revision,
+                            scaleDownTriggerRevision
+                    );
+
                     return completedFuture(null);
                 }
 
@@ -1456,11 +1541,24 @@ public class DistributionZoneManager implements 
IgniteComponent {
                         .thenApply(StatementResult::getAsBoolean)
                         .thenCompose(invokeResult -> inBusyLock(busyLock, () 
-> {
                             if (invokeResult) {
+                                LOG.debug(
+                                        "Updating data nodes for a zone after 
scale down has succeeded "
+                                                + "[zoneId = {}, dataNodes = 
{}, revision = {}]",
+                                        zoneId,
+                                        newDataNodes,
+                                        revision
+                                );
+
                                 // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
                                 // Currently we call clean up only on a node 
that successfully writes data nodes.
                                 
zoneState.cleanUp(Math.min(scaleUpTriggerRevision, revision));
                             } else {
-                                LOG.debug("Updating data nodes for a zone has 
not succeeded [zoneId = {}]", zoneId);
+                                LOG.debug("Updating data nodes for a zone 
after scale down has not succeeded "
+                                                + "[zoneId = {}, dataNodes = 
{}, revision = {}]",
+                                        zoneId,
+                                        newDataNodes,
+                                        revision
+                                );
 
                                 return 
saveDataNodesToMetaStorageOnScaleDown(zoneId, revision);
                             }
@@ -1469,7 +1567,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                         }));
             })).whenComplete((v, e) -> {
                 if (e != null) {
-                    LOG.warn("Failed to update zones' dataNodes value [zoneId 
= {}]", e, zoneId);
+                    LOG.warn("Failed to update zones' dataNodes value after 
scale down [zoneId = {}]", e, zoneId);
                 }
             });
         } finally {
@@ -1599,7 +1697,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * States are needed to track nodes that we want to add or remove from the 
data nodes,
      * to schedule and stop scale up and scale down processes.
      */
-    static class ZoneState {
+    public static class ZoneState {
         /** Schedule task for a scale up process. */
         private ScheduledFuture<?> scaleUpTask;
 
@@ -1636,6 +1734,21 @@ public class DistributionZoneManager implements 
IgniteComponent {
             nodes = emptySet();
         }
 
+        /**
+         * Constructor.
+         *
+         * @param executor Executor for scheduling tasks for scale up and 
scale down processes.
+         * @param topologyAugmentationMap Map that stores pairs revision -> 
{@link Augmentation} for a zone. With this map we can
+         *         track which nodes should be added or removed in the 
processes of scale up or scale down. Revision helps to track
+         *         visibility of the events of adding or removing nodes 
because any process of scale up or scale down has a revision that
+         *         triggered this process.
+         */
+        ZoneState(ScheduledExecutorService executor, 
ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap) {
+            this.executor = executor;
+            this.topologyAugmentationMap = topologyAugmentationMap;
+            nodes = emptySet();
+        }
+
         /**
          * Map that stores pairs revision -> {@link Augmentation} for a zone. 
With this map we can track which nodes
          * should be added or removed in the processes of scale up or scale 
down. Revision helps to track visibility of the events
@@ -1817,12 +1930,12 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         @TestOnly
-        synchronized ScheduledFuture<?> scaleUpTask() {
+        public synchronized ScheduledFuture<?> scaleUpTask() {
             return scaleUpTask;
         }
 
         @TestOnly
-        synchronized ScheduledFuture<?> scaleDownTask() {
+        public synchronized ScheduledFuture<?> scaleDownTask() {
             return scaleDownTask;
         }
     }
@@ -1831,7 +1944,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * Class stores the info about nodes that should be added or removed from 
the data nodes of a zone.
      * With flag {@code addition} we can track whether {@code nodeNames} 
should be added or removed.
      */
-    private static class Augmentation {
+    private static class Augmentation implements Serializable {
+        private static final long serialVersionUID = -7957428671075739621L;
+
         /** Names of the node. */
         Set<Node> nodes;
 
@@ -1855,7 +1970,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
     }
 
     @TestOnly
-    Map<Integer, ZoneState> zonesState() {
+    public Map<Integer, ZoneState> zonesState() {
         return zonesState;
     }
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 32d3571683..7f87409174 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -95,6 +96,9 @@ public class DistributionZonesUtil {
     /** Key prefix, needed for processing the event about zone's update was 
triggered only once. */
     private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX = 
"distributionZones.change.trigger.";
 
+    /** Key prefix that represents {@link ZoneState#topologyAugmentationMap()} 
in the Vault.*/
+    private static final String 
DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_VAULT_PREFIX = 
"vault.distributionZones.topologyAugmentation.";
+
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
     private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY = 
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
 
@@ -105,7 +109,7 @@ public class DistributionZonesUtil {
     private static final ByteArray 
DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT_KEY = new 
ByteArray(DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT);
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT}. */
-    public static final ByteArray 
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
+    private static final ByteArray 
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
             new ByteArray(DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT);
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
@@ -114,9 +118,11 @@ public class DistributionZonesUtil {
 
     /**
      * The initial value of trigger revision in case when it is not 
initialized in the meta storage.
-     * The trigger revision in the meta storage can be uninitialized for the 
default distribution zone.
+     * It is possible because invoke to metastorage with the initialisation is 
async, and scale up/down propagation could be
+     * propagated first. Initial value is -1, because for default zone, we 
initialise trigger keys with metastorage's applied revision,
+     * which is 0 on a start.
      */
-    private static final long INITIAL_TRIGGER_REVISION_VALUE = 0;
+    private static final long INITIAL_TRIGGER_REVISION_VALUE = -1;
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
     private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
@@ -202,6 +208,13 @@ public class DistributionZonesUtil {
         return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
     }
 
+    /**
+     * The key prefix needed for processing an event about zone's data nodes.
+     */
+    static ByteArray zonesDataNodesPrefix() {
+        return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+    }
+
     /**
      * The key that represents logical topology nodes in vault.
      */
@@ -225,10 +238,10 @@ public class DistributionZonesUtil {
     }
 
     /**
-     * The key prefix needed for processing an event about zone's data nodes.
+     * The key that represents {@link ZoneState#topologyAugmentationMap()} in 
the Vault.
      */
-    static ByteArray zonesDataNodesPrefix() {
-        return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+    static ByteArray zoneTopologyAugmentationVault(int zoneId) {
+        return new 
ByteArray(DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_VAULT_PREFIX + zoneId);
     }
 
     /**
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 53f853cad0..af7e98a271 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -470,7 +470,7 @@ public class DistributionZoneManagerScaleUpTest extends 
BaseDistributionZoneMana
 
         assertDataNodesForZone(ZONE_1_ID, Set.of(), keyValueStorage);
 
-        assertZoneScaleUpChangeTriggerKey(2L, ZONE_1_ID, keyValueStorage);
+        assertZoneScaleUpChangeTriggerKey(3L, ZONE_1_ID, keyValueStorage);
 
         doAnswer(invocation -> {
             If iif = invocation.getArgument(0);
@@ -516,7 +516,7 @@ public class DistributionZoneManagerScaleUpTest extends 
BaseDistributionZoneMana
 
         assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
-        assertZoneScaleDownChangeTriggerKey(4L, ZONE_1_ID, keyValueStorage);
+        assertZoneScaleDownChangeTriggerKey(5L, ZONE_1_ID, keyValueStorage);
 
         doAnswer(invocation -> {
             If iif = invocation.getArgument(0);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 6354ece930..d84c675aa4 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -61,7 +61,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 /**
  * Class containing some common logic for Meta Storage Raft group listeners.
  */
-class MetaStorageWriteHandler {
+public class MetaStorageWriteHandler {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageWriteHandler.class);
 
@@ -176,7 +176,7 @@ class MetaStorageWriteHandler {
         }
     }
 
-    private static If toIf(Iif iif) {
+    public static If toIf(Iif iif) {
         return new If(toCondition(iif.condition()), 
toConditionBranch(iif.andThen()), toConditionBranch(iif.orElse()));
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 81e210c9b9..502af7de66 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.distribution.zones;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
 import static 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
@@ -30,18 +32,22 @@ import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.BaseIgniteRestartTest;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -59,12 +65,15 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.If;
 import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -77,6 +86,9 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
@@ -101,6 +113,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
     private static final String ZONE_NAME = "zone1";
 
+    private static final int ZONE_ID = 1;
+
     /**
      * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
      *
@@ -159,10 +173,10 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         when(cmgManager.logicalTopology()).thenAnswer(invocation -> 
completedFuture(logicalTopology.getLogicalTopology()));
 
-        var metaStorageMgr = StandaloneMetaStorageManager.create(
+        var metaStorageMgr = spy(StandaloneMetaStorageManager.create(
                 vault,
-                new TestRocksDbKeyValueStorage("test", 
workDir.resolve("metastorage"))
-        );
+                new TestRocksDbKeyValueStorage(name, 
workDir.resolve("metastorage"))
+        ));
 
         var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vault);
 
@@ -340,4 +354,168 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         assertEquals(scaleUpChangeTriggerRevision, globalStateRevision);
     }
+
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void testLocalDataNodesAreRestoredAfterRestart(int zoneId, String 
zoneName) throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().removeNodes(Set.of(B));
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, 
Set.of(A.name()), TIMEOUT_MILLIS);
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, 
Set.of(A.name()), TIMEOUT_MILLIS);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void testScaleUpTimerIsRestoredAfterRestart(int zoneId, String 
zoneName) throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                zoneId,
+                Set.of(A, 
B).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                TIMEOUT_MILLIS
+        );
+
+        MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+        when(metaStorageManager.invoke(argThat(iif -> {
+            If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+            byte[] keyScaleUp = zoneScaleUpChangeTriggerKey(zoneId).bytes();
+
+            return iif1.andThen().update().operations().stream().anyMatch(op 
-> Arrays.equals(keyScaleUp, op.key()));
+        }))).thenThrow(new RuntimeException("Expected"));
+
+        partialNode.logicalTopology().putNode(C);
+        partialNode.logicalTopology().removeNodes(Set.of(B));
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                zoneId,
+                
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                TIMEOUT_MILLIS
+        );
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                zoneId,
+                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                TIMEOUT_MILLIS
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String 
zoneName) throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+        when(metaStorageManager.invoke(argThat(iif -> {
+            If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+            byte[] keyScaleDown = 
zoneScaleDownChangeTriggerKey(zoneId).bytes();
+
+            return iif1.andThen().update().operations().stream().anyMatch(op 
-> Arrays.equals(keyScaleDown, op.key()));
+        }))).thenThrow(new RuntimeException("Expected"));
+
+        partialNode.logicalTopology().putNode(A);
+        partialNode.logicalTopology().putNode(B);
+        partialNode.logicalTopology().removeNodes(Set.of(B));
+        partialNode.logicalTopology().putNode(C);
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                zoneId,
+                Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                TIMEOUT_MILLIS
+        );
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                zoneId,
+                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                TIMEOUT_MILLIS
+        );
+    }
+
+    private static Stream<Arguments> provideArgumentsRestartTests() {
+        List<Arguments> args = new ArrayList<>();
+
+        args.add(Arguments.of(DEFAULT_ZONE_ID, DEFAULT_ZONE_NAME));
+        args.add(Arguments.of(ZONE_ID, ZONE_NAME));
+
+        return args.stream();
+    }
+
+    private static void createZoneOrAlterDefaultZone(
+            DistributionZoneManager distributionZoneManager,
+            String zoneName
+    ) throws Exception {
+        if (zoneName.equals(DEFAULT_ZONE_NAME)) {
+            distributionZoneManager.alterZone(
+                    DEFAULT_ZONE_NAME,
+                    new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+                            .dataNodesAutoAdjustScaleUp(0)
+                            .dataNodesAutoAdjustScaleDown(0)
+                            .build()
+            ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+            ZoneState zoneState = 
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID);
+
+            // This is needed because we want to wait for the end of scale 
up/down triggered by altering delays.
+            if (zoneState.scaleUpTask() != null) {
+                assertTrue(waitForCondition(() -> 
zoneState.scaleUpTask().isDone(), TIMEOUT_MILLIS));
+            }
+
+            if (zoneState.scaleDownTask() != null) {
+                assertTrue(waitForCondition(() -> 
zoneState.scaleDownTask().isDone(), TIMEOUT_MILLIS));
+            }
+        } else {
+            distributionZoneManager.createZone(
+                    new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                            .dataNodesAutoAdjustScaleUp(0)
+                            .dataNodesAutoAdjustScaleDown(0)
+                            .build()
+            ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        }
+    }
 }


Reply via email to