This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e8d1aa383 IGNITE-20605 Restore scaleUp/scaleDown timers properly 
(#2899)
3e8d1aa383 is described below

commit 3e8d1aa383c1434f55c4d97c3a2539c264b5c859
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 4 11:14:15 2023 +0400

    IGNITE-20605 Restore scaleUp/scaleDown timers properly (#2899)
---
 ...niteDistributionZoneManagerNodeRestartTest.java |  41 ++++
 .../distributionzones/DistributionZoneManager.java | 218 ++++++++-------------
 .../distributionzones/DistributionZonesUtil.java   |  14 --
 3 files changed, 118 insertions(+), 155 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 35aa5537f1..eda13535f8 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -61,6 +61,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.stream.Stream;
@@ -83,6 +84,8 @@ import 
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
 import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
 import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -330,6 +333,44 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
     }
 
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void testTopologyAugmentationMapRestoredAfterRestart(String 
zoneName) throws Exception {
+        PartialNode node = startPartialNode(0);
+
+        node.logicalTopology().putNode(A);
+
+        createZoneOrAlterDefaultZone(node, zoneName, IMMEDIATE_TIMER_VALUE, 
IMMEDIATE_TIMER_VALUE);
+
+        node.logicalTopology().putNode(B);
+        node.logicalTopology().putNode(C);
+
+        int zoneId = getZoneId(node, zoneName);
+
+        DistributionZoneManager distributionZoneManager = 
getDistributionZoneManager(node);
+
+        assertDataNodesFromManager(distributionZoneManager, 
metastore::appliedRevision, zoneId, Set.of(A, B, C), TIMEOUT_MILLIS);
+
+        ConcurrentSkipListMap<Long, Augmentation> nodeAttributesBeforeRestart =
+                
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
+
+        node.stop();
+
+        node = startPartialNode(0);
+
+        distributionZoneManager = getDistributionZoneManager(node);
+
+        ConcurrentSkipListMap<Long, Augmentation> nodeAttributesAfterRestart =
+                
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
+
+        assertEquals(2, nodeAttributesAfterRestart.size());
+
+        assertEquals(
+                
nodeAttributesBeforeRestart.values().stream().map(Augmentation::nodes).collect(toSet()),
+                
nodeAttributesAfterRestart.values().stream().map(Augmentation::nodes).collect(toSet())
+        );
+    }
+
     @Test
     public void testLogicalTopologyRestoredAfterRestart() throws Exception {
         PartialNode node = startPartialNode(0);
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index ec4f607dc8..ebe0ac7b85 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -46,7 +46,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentation;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesFilterUpdateRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
@@ -82,7 +81,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.internal.catalog.CatalogManager;
@@ -119,7 +117,6 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.jetbrains.annotations.TestOnly;
 
@@ -133,9 +130,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Meta Storage manager. */
     private final MetaStorageManager metaStorageManager;
 
-    /** Vault manager. */
-    private final VaultManager vaultMgr;
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -219,7 +213,6 @@ public class DistributionZoneManager implements 
IgniteComponent {
     ) {
         this.metaStorageManager = metaStorageManager;
         this.logicalTopologyService = logicalTopologyService;
-        this.vaultMgr = vaultMgr;
         this.catalogManager = catalogManager;
 
         this.topologyWatchListener = createMetastorageTopologyListener();
@@ -270,7 +263,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             createOrRestoreZonesStates(recoveryRevision);
 
-            restoreLogicalTopologyChangeEvent(recoveryRevision);
+            restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision);
 
             rebalanceEngine.start();
         });
@@ -396,132 +389,106 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         long causalityToken = parameters.causalityToken();
 
-        VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
-
-        if (filterUpdateRevision != null) {
-            // This means that we have already handled event with this 
causalityToken.
-            // It is possible when node was restarted after this listener 
completed,
-            // but applied causalityToken didn't have time to be propagated to 
the Vault.
-            if (bytesToLong(filterUpdateRevision.value()) >= causalityToken) {
-                return nullCompletedFuture();
-            }
-        }
-
-        vaultMgr.put(zonesFilterUpdateRevision(), 
longToBytes(causalityToken)).join();
-
         causalityDataNodesEngine.onUpdateFilter(causalityToken, zoneId, 
newFilter);
 
         return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken);
     }
 
     /**
-     * Creates or restores zone's state depending on the {@link 
ZoneState#topologyAugmentationMap()} existence in the Vault.
-     * We save {@link ZoneState#topologyAugmentationMap()} in the Vault every 
time we receive logical topology changes from the metastore.
+     * Restores zones' states.
      *
      * @param zone Zone descriptor.
      * @param causalityToken Causality token.
      * @return Future reflecting the completion of creation or restoring a 
zone.
      */
-    private CompletableFuture<Void> 
createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
+    private CompletableFuture<Void> restoreZoneStateBusy(CatalogZoneDescriptor 
zone, long causalityToken) {
         int zoneId = zone.id();
 
-        Entry topologyAugmentationMapLocalMetaStorage =
-                
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
+        Entry zoneDataNodesLocalMetaStorage = 
metaStorageManager.getLocally(zoneDataNodesKey(zoneId), causalityToken);
 
-        // First creation of a zone, or first call on the manager start for 
the default zone.
-        if (topologyAugmentationMapLocalMetaStorage.value() == null) {
-            ZoneState zoneState = new ZoneState(executor);
-
-            ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
-
-            assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
-
-            Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+        if (zoneDataNodesLocalMetaStorage.value() == null) {
+            // In this case, creation of a zone was interrupted during restart.
+            return onCreateZone(zone, causalityToken);
+        } else {
+            Entry topologyAugmentationMapLocalMetaStorage = 
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
 
-            
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone);
+            ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap;
 
-            return initDataNodesAndTriggerKeysInMetaStorage(zoneId, 
causalityToken, dataNodes);
-        } else {
-            // Restart case, when topologyAugmentationMap has already been 
saved during a cluster work.
-            ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap 
= fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+            if (topologyAugmentationMapLocalMetaStorage.value() == null) {
+                // This case means that there won't any logical topology 
updates before restart.
+                topologyAugmentationMap = new ConcurrentSkipListMap<>();
+            } else {
+                topologyAugmentationMap = 
fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+            }
 
             ZoneState zoneState = new ZoneState(executor, 
topologyAugmentationMap);
 
             ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, 
zoneState);
 
             assert prevZoneState == null : "Zone's state was created twice 
[zoneId = " + zoneId + ']';
+        }
 
-            Optional<Long> maxScaleUpRevision = 
zoneState.highestRevision(true);
+        causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, 
zone);
+
+        return nullCompletedFuture();
+    }
 
-            Optional<Long> maxScaleDownRevision = 
zoneState.highestRevision(false);
+    private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone, 
long causalityToken) {
+        int zoneId = zone.id();
 
-            VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
+        ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap = 
new ConcurrentSkipListMap<>();
 
-            restoreTimers(zone, zoneState, maxScaleUpRevision, 
maxScaleDownRevision, filterUpdateRevision);
-        }
+        ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap);
+
+        ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, zoneState);
+
+        assert prevZoneState == null : "Zone's state was created twice [zoneId 
= " + zoneId + ']';
+
+        Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
 
         causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, 
zone);
 
-        return nullCompletedFuture();
+        return initDataNodesAndTriggerKeysInMetaStorage(zoneId, 
causalityToken, dataNodes);
     }
 
     /**
-     * Restores timers that were scheduled before a node's restart.
-     * Take the highest revision from the {@link 
ZoneState#topologyAugmentationMap()}, compare it with the revision
-     * of the last update of the zone's filter and schedule scale up/scale 
down timers. Filter revision is taken into account because
-     * any filter update triggers immediate scale up.
+     * Restores timers that were scheduled before a node's restart. Take the 
highest revision from the
+     * {@link ZoneState#topologyAugmentationMap()}, schedule scale up/scale 
down timers.
      *
-     * @param zone Zone descriptor.
-     * @param zoneState Zone's state from Distribution Zone Manager
-     * @param maxScaleUpRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node joins.
-     * @param maxScaleDownRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node removals.
-     * @param filterUpdateRevisionVaultEntry Revision of the last update of 
the zone's filter.
+     * @param catalogVersion Catalog version.
+     * @return Future that represents the pending completion of the operation.
+     *         For the immediate timers it will be completed when data nodes 
will be updated in Meta Storage.
      */
-    private void restoreTimers(
-            CatalogZoneDescriptor zone,
-            ZoneState zoneState,
-            Optional<Long> maxScaleUpRevisionOptional,
-            Optional<Long> maxScaleDownRevisionOptional,
-            VaultEntry filterUpdateRevisionVaultEntry
-    ) {
-        int zoneId = zone.id();
+    private CompletableFuture<Void> restoreTimers(int catalogVersion) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        maxScaleUpRevisionOptional.ifPresent(
-                maxScaleUpRevision -> {
-                    if (filterUpdateRevisionVaultEntry != null) {
-                        long filterUpdateRevision = 
bytesToLong(filterUpdateRevisionVaultEntry.value());
+        for (CatalogZoneDescriptor zone : 
catalogManager.zones(catalogVersion)) {
+            ZoneState zoneState = zonesState.get(zone.id());
 
-                        // Immediately trigger scale up, that was planned to 
be invoked before restart.
-                        // If this invoke was successful before restart, then 
current call just will be skipped.
-                        saveDataNodesToMetaStorageOnScaleUp(zoneId, 
filterUpdateRevision);
+            // Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node joins.
+            Optional<Long> maxScaleUpRevisionOptional = 
zoneState.highestRevision(true);
 
-                        if (maxScaleUpRevision < filterUpdateRevision) {
-                            // Don't need to trigger additional scale up for 
the scenario, when filter update event happened after the last
-                            // node join event.
-                            return;
-                        }
+            //Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node removals.
+            Optional<Long> maxScaleDownRevisionOptional = 
zoneState.highestRevision(false);
+
+            maxScaleUpRevisionOptional.ifPresent(
+                    maxScaleUpRevision -> {
+                        // Take the highest revision from the 
topologyAugmentationMap and schedule scale up/scale down,
+                        // meaning that all augmentations of nodes will be 
taken into account in newly created timers.
+                        // If augmentations have already been proposed to data 
nodes in the metastorage before restart,
+                        // that means we have updated corresponding trigger 
key and it's value will be greater or equal to
+                        // the highest revision from the 
topologyAugmentationMap, and current timer won't affect data nodes.
+
+                        futures.add(scheduleTimers(zone, true, false, 
maxScaleUpRevision));
                     }
+            );
 
-                    // Take the highest revision from the 
topologyAugmentationMap and schedule scale up/scale down,
-                    // meaning that all augmentations of nodes will be taken 
into account in newly created timers.
-                    // If augmentations have already been proposed to data 
nodes in the metastorage before restart,
-                    // that means we have updated corresponding trigger key 
and it's value will be greater than
-                    // the highest revision from the topologyAugmentationMap, 
and current timer won't affect data nodes.
-                    zoneState.rescheduleScaleUp(
-                            zone.dataNodesAutoAdjustScaleUp(),
-                            () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
maxScaleUpRevision),
-                            zoneId
-                    );
-                }
-        );
+            maxScaleDownRevisionOptional.ifPresent(
+                    maxScaleDownRevision -> futures.add(scheduleTimers(zone, 
false, true, maxScaleDownRevision))
+            );
+        }
 
-        maxScaleDownRevisionOptional.ifPresent(
-                maxScaleDownRevision -> zoneState.rescheduleScaleDown(
-                        zone.dataNodesAutoAdjustScaleDown(),
-                        () -> saveDataNodesToMetaStorageOnScaleDown(zoneId, 
maxScaleDownRevision),
-                        zoneId
-                )
-        );
+        return allOf(futures.toArray(CompletableFuture[]::new));
     }
 
     /**
@@ -795,7 +762,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             updateLocalTopologyAugmentationMap(addedNodes, removedNodes, 
revision, zoneId);
 
-            futures.add(scheduleTimers(zone, addedNodes, removedNodes, 
revision));
+            futures.add(scheduleTimers(zone, !addedNodes.isEmpty(), 
!removedNodes.isEmpty(), revision));
 
             zoneIds.add(zone.id());
         }
@@ -880,48 +847,13 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * Schedules scale up and scale down timers.
      *
      * @param zone Zone descriptor.
-     * @param addedNodes Nodes that was added to a topology and should be 
added to zones data nodes.
-     * @param removedNodes Nodes that was removed from a topology and should 
be removed from zones data nodes.
-     * @param revision Revision that triggered that event.
-     * @return Future that represents the pending completion of the operation.
-     *         For the immediate timers it will be completed when data nodes 
will be updated in Meta Storage.
-     */
-    private CompletableFuture<Void> scheduleTimers(
-            CatalogZoneDescriptor zone,
-            Set<Node> addedNodes,
-            Set<Node> removedNodes,
-            long revision
-    ) {
-        return scheduleTimers(
-                zone,
-                addedNodes,
-                removedNodes,
-                revision,
-                this::saveDataNodesToMetaStorageOnScaleUp,
-                this::saveDataNodesToMetaStorageOnScaleDown
-        );
-    }
-
-    /**
-     * Schedules scale up and scale down timers. This method is needed also 
for test purposes.
-     *
-     * @param zone Zone descriptor.
-     * @param addedNodes Nodes that was added to a topology and should be 
added to zones data nodes.
-     * @param removedNodes Nodes that was removed from a topology and should 
be removed from zones data nodes.
+     * @param nodesAdded Flag indicating that nodes was added to a topology 
and should be added to zones data nodes.
+     * @param nodesRemoved Flag indicating that nodes was removed from a 
topology and should be removed from zones data nodes.
      * @param revision Revision that triggered that event.
-     * @param saveDataNodesOnScaleUp Function that saves nodes to a zone's 
data nodes in case of scale up was triggered.
-     * @param saveDataNodesOnScaleDown Function that saves nodes to a zone's 
data nodes in case of scale down was triggered.
      * @return Future that represents the pending completion of the operation.
      *         For the immediate timers it will be completed when data nodes 
will be updated in Meta Storage.
      */
-    private CompletableFuture<Void> scheduleTimers(
-            CatalogZoneDescriptor zone,
-            Set<Node> addedNodes,
-            Set<Node> removedNodes,
-            long revision,
-            BiFunction<Integer, Long, CompletableFuture<Void>> 
saveDataNodesOnScaleUp,
-            BiFunction<Integer, Long, CompletableFuture<Void>> 
saveDataNodesOnScaleDown
-    ) {
+    private CompletableFuture<Void> scheduleTimers(CatalogZoneDescriptor zone, 
boolean nodesAdded, boolean nodesRemoved, long revision) {
         int autoAdjust = zone.dataNodesAutoAdjust();
         int autoAdjustScaleDown = zone.dataNodesAutoAdjustScaleDown();
         int autoAdjustScaleUp = zone.dataNodesAutoAdjustScaleUp();
@@ -930,11 +862,11 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust 
!= INFINITE_TIMER_VALUE) {
+        if ((nodesAdded || nodesRemoved) && autoAdjust != 
INFINITE_TIMER_VALUE) {
             //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust 
timer.
             throw new UnsupportedOperationException("Data nodes auto adjust is 
not supported.");
         } else {
-            if (!addedNodes.isEmpty()) {
+            if (nodesAdded) {
                 if (autoAdjustScaleUp == IMMEDIATE_TIMER_VALUE) {
                     futures.add(saveDataNodesToMetaStorageOnScaleUp(zoneId, 
revision));
                 }
@@ -942,13 +874,13 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 if (autoAdjustScaleUp != INFINITE_TIMER_VALUE) {
                     zonesState.get(zoneId).rescheduleScaleUp(
                             autoAdjustScaleUp,
-                            () -> saveDataNodesOnScaleUp.apply(zoneId, 
revision),
+                            () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
revision),
                             zoneId
                     );
                 }
             }
 
-            if (!removedNodes.isEmpty()) {
+            if (nodesRemoved) {
                 if (zone.dataNodesAutoAdjustScaleDown() == 
IMMEDIATE_TIMER_VALUE) {
                     futures.add(saveDataNodesToMetaStorageOnScaleDown(zoneId, 
revision));
                 }
@@ -956,7 +888,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 if (autoAdjustScaleDown != INFINITE_TIMER_VALUE) {
                     zonesState.get(zoneId).rescheduleScaleDown(
                             autoAdjustScaleDown,
-                            () -> saveDataNodesOnScaleDown.apply(zoneId, 
revision),
+                            () -> 
saveDataNodesToMetaStorageOnScaleDown(zoneId, revision),
                             zoneId
                     );
                 }
@@ -1461,7 +1393,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             CreateZoneEventParameters params = (CreateZoneEventParameters) 
parameters;
 
-            return createOrRestoreZoneStateBusy(params.zoneDescriptor(), 
params.causalityToken())
+            return onCreateZone(params.zoneDescriptor(), 
params.causalityToken())
                     .thenCompose((ignored) -> falseCompletedFuture());
         }));
 
@@ -1480,16 +1412,17 @@ public class DistributionZoneManager implements 
IgniteComponent {
         // TODO: IGNITE-20287 Clean up abandoned resources for dropped zones 
from volt and metastore
         for (CatalogZoneDescriptor zone : 
catalogManager.zones(catalogVersion)) {
             // TODO: return this futures 
https://issues.apache.org/jira/browse/IGNITE-20477
-            createOrRestoreZoneStateBusy(zone, recoveryRevision);
+            restoreZoneStateBusy(zone, recoveryRevision);
         }
     }
 
     /**
      * Restore the event of the updating the logical topology from Meta 
Storage, that has not been completed before restart.
+     * Also start scale up/scale down timers.
      *
      * @param recoveryRevision Revision of the Meta Storage after its recovery.
      */
-    private void restoreLogicalTopologyChangeEvent(long recoveryRevision) {
+    private void restoreLogicalTopologyChangeEventAndStartTimers(long 
recoveryRevision) {
         Entry topologyEntry = 
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
 
         if (topologyEntry.value() != null) {
@@ -1505,6 +1438,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
             if (lastUpdateRevisionEntry.value() == null || topologyRevision > 
bytesToLong(lastUpdateRevisionEntry.value())) {
                 // TODO: return this futures 
https://issues.apache.org/jira/browse/IGNITE-20477
                 onLogicalTopologyUpdate(newLogicalTopology, recoveryRevision, 
catalogVersion);
+            } else {
+                // TODO: return this futures 
https://issues.apache.org/jira/browse/IGNITE-20477
+                restoreTimers(catalogVersion);
             }
         }
     }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index d162cc0e26..6deeab392e 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -87,9 +87,6 @@ public class DistributionZonesUtil {
     /** Key value for the last handled logical topology by Distribution zone 
manager. */
     private static final String DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY = 
"distributionZones.lastHandledTopology";
 
-    /** Key value for zones' filter update revision in vault. */
-    private static final String 
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT = 
"vault.distributionZones.filterUpdate.revision";
-
     /** Key prefix for zones' logical topology nodes. */
     private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
 
@@ -117,10 +114,6 @@ public class DistributionZonesUtil {
     private static final ByteArray 
DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY_KEY =
             new ByteArray(DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY);
 
-    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT}. */
-    private static final ByteArray 
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY =
-            new ByteArray(DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT);
-
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
     private static final ByteArray 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
             new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
@@ -248,13 +241,6 @@ public class DistributionZonesUtil {
         return DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY_KEY;
     }
 
-    /**
-     * The key represents the last revision of the zone's filter update.
-     */
-    public static ByteArray zonesFilterUpdateRevision() {
-        return DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY;
-    }
-
     /**
      * The key that represents {@link ZoneState#topologyAugmentationMap()} in 
the Meta Storage.
      */

Reply via email to