This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3e8d1aa383 IGNITE-20605 Restore scaleUp/scaleDown timers properly
(#2899)
3e8d1aa383 is described below
commit 3e8d1aa383c1434f55c4d97c3a2539c264b5c859
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 4 11:14:15 2023 +0400
IGNITE-20605 Restore scaleUp/scaleDown timers properly (#2899)
---
...niteDistributionZoneManagerNodeRestartTest.java | 41 ++++
.../distributionzones/DistributionZoneManager.java | 218 ++++++++-------------
.../distributionzones/DistributionZonesUtil.java | 14 --
3 files changed, 118 insertions(+), 155 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 35aa5537f1..eda13535f8 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -61,6 +61,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.stream.Stream;
@@ -83,6 +84,8 @@ import
org.apache.ignite.internal.configuration.storage.DistributedConfiguration
import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.Augmentation;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.ByteArray;
@@ -330,6 +333,44 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
}
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void testTopologyAugmentationMapRestoredAfterRestart(String
zoneName) throws Exception {
+ PartialNode node = startPartialNode(0);
+
+ node.logicalTopology().putNode(A);
+
+ createZoneOrAlterDefaultZone(node, zoneName, IMMEDIATE_TIMER_VALUE,
IMMEDIATE_TIMER_VALUE);
+
+ node.logicalTopology().putNode(B);
+ node.logicalTopology().putNode(C);
+
+ int zoneId = getZoneId(node, zoneName);
+
+ DistributionZoneManager distributionZoneManager =
getDistributionZoneManager(node);
+
+ assertDataNodesFromManager(distributionZoneManager,
metastore::appliedRevision, zoneId, Set.of(A, B, C), TIMEOUT_MILLIS);
+
+ ConcurrentSkipListMap<Long, Augmentation> nodeAttributesBeforeRestart =
+
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
+
+ node.stop();
+
+ node = startPartialNode(0);
+
+ distributionZoneManager = getDistributionZoneManager(node);
+
+ ConcurrentSkipListMap<Long, Augmentation> nodeAttributesAfterRestart =
+
distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap();
+
+ assertEquals(2, nodeAttributesAfterRestart.size());
+
+ assertEquals(
+
nodeAttributesBeforeRestart.values().stream().map(Augmentation::nodes).collect(toSet()),
+
nodeAttributesAfterRestart.values().stream().map(Augmentation::nodes).collect(toSet())
+ );
+ }
+
@Test
public void testLogicalTopologyRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index ec4f607dc8..ebe0ac7b85 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -46,7 +46,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentation;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesFilterUpdateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
@@ -82,7 +81,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.catalog.CatalogManager;
@@ -119,7 +117,6 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.jetbrains.annotations.TestOnly;
@@ -133,9 +130,6 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Meta Storage manager. */
private final MetaStorageManager metaStorageManager;
- /** Vault manager. */
- private final VaultManager vaultMgr;
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -219,7 +213,6 @@ public class DistributionZoneManager implements
IgniteComponent {
) {
this.metaStorageManager = metaStorageManager;
this.logicalTopologyService = logicalTopologyService;
- this.vaultMgr = vaultMgr;
this.catalogManager = catalogManager;
this.topologyWatchListener = createMetastorageTopologyListener();
@@ -270,7 +263,7 @@ public class DistributionZoneManager implements
IgniteComponent {
createOrRestoreZonesStates(recoveryRevision);
- restoreLogicalTopologyChangeEvent(recoveryRevision);
+ restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision);
rebalanceEngine.start();
});
@@ -396,132 +389,106 @@ public class DistributionZoneManager implements
IgniteComponent {
long causalityToken = parameters.causalityToken();
- VaultEntry filterUpdateRevision =
vaultMgr.get(zonesFilterUpdateRevision()).join();
-
- if (filterUpdateRevision != null) {
- // This means that we have already handled event with this
causalityToken.
- // It is possible when node was restarted after this listener
completed,
- // but applied causalityToken didn't have time to be propagated to
the Vault.
- if (bytesToLong(filterUpdateRevision.value()) >= causalityToken) {
- return nullCompletedFuture();
- }
- }
-
- vaultMgr.put(zonesFilterUpdateRevision(),
longToBytes(causalityToken)).join();
-
causalityDataNodesEngine.onUpdateFilter(causalityToken, zoneId,
newFilter);
return saveDataNodesToMetaStorageOnScaleUp(zoneId, causalityToken);
}
/**
- * Creates or restores zone's state depending on the {@link
ZoneState#topologyAugmentationMap()} existence in the Vault.
- * We save {@link ZoneState#topologyAugmentationMap()} in the Vault every
time we receive logical topology changes from the metastore.
+ * Restores zones' states.
*
* @param zone Zone descriptor.
* @param causalityToken Causality token.
* @return Future reflecting the completion of creation or restoring a
zone.
*/
- private CompletableFuture<Void>
createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
+ private CompletableFuture<Void> restoreZoneStateBusy(CatalogZoneDescriptor
zone, long causalityToken) {
int zoneId = zone.id();
- Entry topologyAugmentationMapLocalMetaStorage =
-
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
+ Entry zoneDataNodesLocalMetaStorage =
metaStorageManager.getLocally(zoneDataNodesKey(zoneId), causalityToken);
- // First creation of a zone, or first call on the manager start for
the default zone.
- if (topologyAugmentationMapLocalMetaStorage.value() == null) {
- ZoneState zoneState = new ZoneState(executor);
-
- ZoneState prevZoneState = zonesState.putIfAbsent(zoneId,
zoneState);
-
- assert prevZoneState == null : "Zone's state was created twice
[zoneId = " + zoneId + ']';
-
- Set<Node> dataNodes =
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+ if (zoneDataNodesLocalMetaStorage.value() == null) {
+ // In this case, creation of a zone was interrupted during restart.
+ return onCreateZone(zone, causalityToken);
+ } else {
+ Entry topologyAugmentationMapLocalMetaStorage =
metaStorageManager.getLocally(zoneTopologyAugmentation(zoneId), causalityToken);
-
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone);
+ ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap;
- return initDataNodesAndTriggerKeysInMetaStorage(zoneId,
causalityToken, dataNodes);
- } else {
- // Restart case, when topologyAugmentationMap has already been
saved during a cluster work.
- ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap
= fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+ if (topologyAugmentationMapLocalMetaStorage.value() == null) {
+ // This case means that there won't any logical topology
updates before restart.
+ topologyAugmentationMap = new ConcurrentSkipListMap<>();
+ } else {
+ topologyAugmentationMap =
fromBytes(topologyAugmentationMapLocalMetaStorage.value());
+ }
ZoneState zoneState = new ZoneState(executor,
topologyAugmentationMap);
ZoneState prevZoneState = zonesState.putIfAbsent(zoneId,
zoneState);
assert prevZoneState == null : "Zone's state was created twice
[zoneId = " + zoneId + ']';
+ }
- Optional<Long> maxScaleUpRevision =
zoneState.highestRevision(true);
+ causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken,
zone);
+
+ return nullCompletedFuture();
+ }
- Optional<Long> maxScaleDownRevision =
zoneState.highestRevision(false);
+ private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone,
long causalityToken) {
+ int zoneId = zone.id();
- VaultEntry filterUpdateRevision =
vaultMgr.get(zonesFilterUpdateRevision()).join();
+ ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap =
new ConcurrentSkipListMap<>();
- restoreTimers(zone, zoneState, maxScaleUpRevision,
maxScaleDownRevision, filterUpdateRevision);
- }
+ ZoneState zoneState = new ZoneState(executor, topologyAugmentationMap);
+
+ ZoneState prevZoneState = zonesState.putIfAbsent(zoneId, zoneState);
+
+ assert prevZoneState == null : "Zone's state was created twice [zoneId
= " + zoneId + ']';
+
+ Set<Node> dataNodes =
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken,
zone);
- return nullCompletedFuture();
+ return initDataNodesAndTriggerKeysInMetaStorage(zoneId,
causalityToken, dataNodes);
}
/**
- * Restores timers that were scheduled before a node's restart.
- * Take the highest revision from the {@link
ZoneState#topologyAugmentationMap()}, compare it with the revision
- * of the last update of the zone's filter and schedule scale up/scale
down timers. Filter revision is taken into account because
- * any filter update triggers immediate scale up.
+ * Restores timers that were scheduled before a node's restart. Take the
highest revision from the
+ * {@link ZoneState#topologyAugmentationMap()}, schedule scale up/scale
down timers.
*
- * @param zone Zone descriptor.
- * @param zoneState Zone's state from Distribution Zone Manager
- * @param maxScaleUpRevisionOptional Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node joins.
- * @param maxScaleDownRevisionOptional Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node removals.
- * @param filterUpdateRevisionVaultEntry Revision of the last update of
the zone's filter.
+ * @param catalogVersion Catalog version.
+ * @return Future that represents the pending completion of the operation.
+ * For the immediate timers it will be completed when data nodes
will be updated in Meta Storage.
*/
- private void restoreTimers(
- CatalogZoneDescriptor zone,
- ZoneState zoneState,
- Optional<Long> maxScaleUpRevisionOptional,
- Optional<Long> maxScaleDownRevisionOptional,
- VaultEntry filterUpdateRevisionVaultEntry
- ) {
- int zoneId = zone.id();
+ private CompletableFuture<Void> restoreTimers(int catalogVersion) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
- maxScaleUpRevisionOptional.ifPresent(
- maxScaleUpRevision -> {
- if (filterUpdateRevisionVaultEntry != null) {
- long filterUpdateRevision =
bytesToLong(filterUpdateRevisionVaultEntry.value());
+ for (CatalogZoneDescriptor zone :
catalogManager.zones(catalogVersion)) {
+ ZoneState zoneState = zonesState.get(zone.id());
- // Immediately trigger scale up, that was planned to
be invoked before restart.
- // If this invoke was successful before restart, then
current call just will be skipped.
- saveDataNodesToMetaStorageOnScaleUp(zoneId,
filterUpdateRevision);
+ // Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node joins.
+ Optional<Long> maxScaleUpRevisionOptional =
zoneState.highestRevision(true);
- if (maxScaleUpRevision < filterUpdateRevision) {
- // Don't need to trigger additional scale up for
the scenario, when filter update event happened after the last
- // node join event.
- return;
- }
+ //Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node removals.
+ Optional<Long> maxScaleDownRevisionOptional =
zoneState.highestRevision(false);
+
+ maxScaleUpRevisionOptional.ifPresent(
+ maxScaleUpRevision -> {
+ // Take the highest revision from the
topologyAugmentationMap and schedule scale up/scale down,
+ // meaning that all augmentations of nodes will be
taken into account in newly created timers.
+ // If augmentations have already been proposed to data
nodes in the metastorage before restart,
+ // that means we have updated corresponding trigger
key and it's value will be greater or equal to
+ // the highest revision from the
topologyAugmentationMap, and current timer won't affect data nodes.
+
+ futures.add(scheduleTimers(zone, true, false,
maxScaleUpRevision));
}
+ );
- // Take the highest revision from the
topologyAugmentationMap and schedule scale up/scale down,
- // meaning that all augmentations of nodes will be taken
into account in newly created timers.
- // If augmentations have already been proposed to data
nodes in the metastorage before restart,
- // that means we have updated corresponding trigger key
and it's value will be greater than
- // the highest revision from the topologyAugmentationMap,
and current timer won't affect data nodes.
- zoneState.rescheduleScaleUp(
- zone.dataNodesAutoAdjustScaleUp(),
- () -> saveDataNodesToMetaStorageOnScaleUp(zoneId,
maxScaleUpRevision),
- zoneId
- );
- }
- );
+ maxScaleDownRevisionOptional.ifPresent(
+ maxScaleDownRevision -> futures.add(scheduleTimers(zone,
false, true, maxScaleDownRevision))
+ );
+ }
- maxScaleDownRevisionOptional.ifPresent(
- maxScaleDownRevision -> zoneState.rescheduleScaleDown(
- zone.dataNodesAutoAdjustScaleDown(),
- () -> saveDataNodesToMetaStorageOnScaleDown(zoneId,
maxScaleDownRevision),
- zoneId
- )
- );
+ return allOf(futures.toArray(CompletableFuture[]::new));
}
/**
@@ -795,7 +762,7 @@ public class DistributionZoneManager implements
IgniteComponent {
updateLocalTopologyAugmentationMap(addedNodes, removedNodes,
revision, zoneId);
- futures.add(scheduleTimers(zone, addedNodes, removedNodes,
revision));
+ futures.add(scheduleTimers(zone, !addedNodes.isEmpty(),
!removedNodes.isEmpty(), revision));
zoneIds.add(zone.id());
}
@@ -880,48 +847,13 @@ public class DistributionZoneManager implements
IgniteComponent {
* Schedules scale up and scale down timers.
*
* @param zone Zone descriptor.
- * @param addedNodes Nodes that was added to a topology and should be
added to zones data nodes.
- * @param removedNodes Nodes that was removed from a topology and should
be removed from zones data nodes.
- * @param revision Revision that triggered that event.
- * @return Future that represents the pending completion of the operation.
- * For the immediate timers it will be completed when data nodes
will be updated in Meta Storage.
- */
- private CompletableFuture<Void> scheduleTimers(
- CatalogZoneDescriptor zone,
- Set<Node> addedNodes,
- Set<Node> removedNodes,
- long revision
- ) {
- return scheduleTimers(
- zone,
- addedNodes,
- removedNodes,
- revision,
- this::saveDataNodesToMetaStorageOnScaleUp,
- this::saveDataNodesToMetaStorageOnScaleDown
- );
- }
-
- /**
- * Schedules scale up and scale down timers. This method is needed also
for test purposes.
- *
- * @param zone Zone descriptor.
- * @param addedNodes Nodes that was added to a topology and should be
added to zones data nodes.
- * @param removedNodes Nodes that was removed from a topology and should
be removed from zones data nodes.
+ * @param nodesAdded Flag indicating that nodes was added to a topology
and should be added to zones data nodes.
+ * @param nodesRemoved Flag indicating that nodes was removed from a
topology and should be removed from zones data nodes.
* @param revision Revision that triggered that event.
- * @param saveDataNodesOnScaleUp Function that saves nodes to a zone's
data nodes in case of scale up was triggered.
- * @param saveDataNodesOnScaleDown Function that saves nodes to a zone's
data nodes in case of scale down was triggered.
* @return Future that represents the pending completion of the operation.
* For the immediate timers it will be completed when data nodes
will be updated in Meta Storage.
*/
- private CompletableFuture<Void> scheduleTimers(
- CatalogZoneDescriptor zone,
- Set<Node> addedNodes,
- Set<Node> removedNodes,
- long revision,
- BiFunction<Integer, Long, CompletableFuture<Void>>
saveDataNodesOnScaleUp,
- BiFunction<Integer, Long, CompletableFuture<Void>>
saveDataNodesOnScaleDown
- ) {
+ private CompletableFuture<Void> scheduleTimers(CatalogZoneDescriptor zone,
boolean nodesAdded, boolean nodesRemoved, long revision) {
int autoAdjust = zone.dataNodesAutoAdjust();
int autoAdjustScaleDown = zone.dataNodesAutoAdjustScaleDown();
int autoAdjustScaleUp = zone.dataNodesAutoAdjustScaleUp();
@@ -930,11 +862,11 @@ public class DistributionZoneManager implements
IgniteComponent {
List<CompletableFuture<Void>> futures = new ArrayList<>();
- if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust
!= INFINITE_TIMER_VALUE) {
+ if ((nodesAdded || nodesRemoved) && autoAdjust !=
INFINITE_TIMER_VALUE) {
//TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust
timer.
throw new UnsupportedOperationException("Data nodes auto adjust is
not supported.");
} else {
- if (!addedNodes.isEmpty()) {
+ if (nodesAdded) {
if (autoAdjustScaleUp == IMMEDIATE_TIMER_VALUE) {
futures.add(saveDataNodesToMetaStorageOnScaleUp(zoneId,
revision));
}
@@ -942,13 +874,13 @@ public class DistributionZoneManager implements
IgniteComponent {
if (autoAdjustScaleUp != INFINITE_TIMER_VALUE) {
zonesState.get(zoneId).rescheduleScaleUp(
autoAdjustScaleUp,
- () -> saveDataNodesOnScaleUp.apply(zoneId,
revision),
+ () -> saveDataNodesToMetaStorageOnScaleUp(zoneId,
revision),
zoneId
);
}
}
- if (!removedNodes.isEmpty()) {
+ if (nodesRemoved) {
if (zone.dataNodesAutoAdjustScaleDown() ==
IMMEDIATE_TIMER_VALUE) {
futures.add(saveDataNodesToMetaStorageOnScaleDown(zoneId,
revision));
}
@@ -956,7 +888,7 @@ public class DistributionZoneManager implements
IgniteComponent {
if (autoAdjustScaleDown != INFINITE_TIMER_VALUE) {
zonesState.get(zoneId).rescheduleScaleDown(
autoAdjustScaleDown,
- () -> saveDataNodesOnScaleDown.apply(zoneId,
revision),
+ () ->
saveDataNodesToMetaStorageOnScaleDown(zoneId, revision),
zoneId
);
}
@@ -1461,7 +1393,7 @@ public class DistributionZoneManager implements
IgniteComponent {
CreateZoneEventParameters params = (CreateZoneEventParameters)
parameters;
- return createOrRestoreZoneStateBusy(params.zoneDescriptor(),
params.causalityToken())
+ return onCreateZone(params.zoneDescriptor(),
params.causalityToken())
.thenCompose((ignored) -> falseCompletedFuture());
}));
@@ -1480,16 +1412,17 @@ public class DistributionZoneManager implements
IgniteComponent {
// TODO: IGNITE-20287 Clean up abandoned resources for dropped zones
from volt and metastore
for (CatalogZoneDescriptor zone :
catalogManager.zones(catalogVersion)) {
// TODO: return this futures
https://issues.apache.org/jira/browse/IGNITE-20477
- createOrRestoreZoneStateBusy(zone, recoveryRevision);
+ restoreZoneStateBusy(zone, recoveryRevision);
}
}
/**
* Restore the event of the updating the logical topology from Meta
Storage, that has not been completed before restart.
+ * Also start scale up/scale down timers.
*
* @param recoveryRevision Revision of the Meta Storage after its recovery.
*/
- private void restoreLogicalTopologyChangeEvent(long recoveryRevision) {
+ private void restoreLogicalTopologyChangeEventAndStartTimers(long
recoveryRevision) {
Entry topologyEntry =
metaStorageManager.getLocally(zonesLogicalTopologyKey(), recoveryRevision);
if (topologyEntry.value() != null) {
@@ -1505,6 +1438,9 @@ public class DistributionZoneManager implements
IgniteComponent {
if (lastUpdateRevisionEntry.value() == null || topologyRevision >
bytesToLong(lastUpdateRevisionEntry.value())) {
// TODO: return this futures
https://issues.apache.org/jira/browse/IGNITE-20477
onLogicalTopologyUpdate(newLogicalTopology, recoveryRevision,
catalogVersion);
+ } else {
+ // TODO: return this futures
https://issues.apache.org/jira/browse/IGNITE-20477
+ restoreTimers(catalogVersion);
}
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index d162cc0e26..6deeab392e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -87,9 +87,6 @@ public class DistributionZonesUtil {
/** Key value for the last handled logical topology by Distribution zone
manager. */
private static final String DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY =
"distributionZones.lastHandledTopology";
- /** Key value for zones' filter update revision in vault. */
- private static final String
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT =
"vault.distributionZones.filterUpdate.revision";
-
/** Key prefix for zones' logical topology nodes. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
@@ -117,10 +114,6 @@ public class DistributionZonesUtil {
private static final ByteArray
DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY);
- /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT}. */
- private static final ByteArray
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY =
- new ByteArray(DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT);
-
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
@@ -248,13 +241,6 @@ public class DistributionZonesUtil {
return DISTRIBUTION_ZONES_LAST_HANDLED_TOPOLOGY_KEY;
}
- /**
- * The key represents the last revision of the zone's filter update.
- */
- public static ByteArray zonesFilterUpdateRevision() {
- return DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY;
- }
-
/**
* The key that represents {@link ZoneState#topologyAugmentationMap()} in
the Meta Storage.
*/