This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e302bf2f1f IGNITE-19580 Add restoring of zones state after restart
(#2253)
e302bf2f1f is described below
commit e302bf2f1f9ca86603135570834c40dad20fcf3b
Author: Mirza Aliev <[email protected]>
AuthorDate: Wed Jun 28 16:53:19 2023 +0400
IGNITE-19580 Add restoring of zones state after restart (#2253)
---
.../distributionzones/DistributionZoneManager.java | 273 +++++++++++++++------
.../distributionzones/DistributionZonesUtil.java | 25 +-
.../DistributionZoneManagerScaleUpTest.java | 4 +-
.../server/raft/MetaStorageWriteHandler.java | 4 +-
...niteDistributionZoneManagerNodeRestartTest.java | 184 +++++++++++++-
5 files changed, 398 insertions(+), 92 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index aeaf81936c..5673101862 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -42,6 +42,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentationVault;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -62,8 +63,10 @@ import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermin
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -320,15 +323,6 @@ public class DistributionZoneManager implements
IgniteComponent {
rebalanceEngine.start();
- // Init timers after restart.
- zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
-
- zonesConfiguration.distributionZones().value().forEach(zone -> {
- int zoneId = zone.zoneId();
-
- zonesState.putIfAbsent(zoneId, new ZoneState(executor));
- });
-
logicalTopologyService.addEventListener(topologyEventListener);
metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(),
topologyWatchListener);
@@ -336,7 +330,10 @@ public class DistributionZoneManager implements
IgniteComponent {
restoreGlobalStateFromVault();
- initDataNodesFromVaultManager();
+ long appliedRevision = metaStorageManager.appliedRevision();
+
+ // onCreate for default zone is not called, so we have to restore
it's state on start
+
createOrRestoreZoneState(zonesConfiguration.defaultDistributionZone().value(),
appliedRevision);
} finally {
busyLock.leaveBusy();
}
@@ -765,17 +762,9 @@ public class DistributionZoneManager implements
IgniteComponent {
private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
@Override
public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
- int zoneId = ctx.newValue().zoneId();
+ DistributionZoneView zone = ctx.newValue();
- ZoneState zoneState = new ZoneState(executor);
-
- zonesState.putIfAbsent(zoneId, zoneState);
-
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- zoneId,
- ctx.storageRevision(),
-
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
- );
+ createOrRestoreZoneState(zone, ctx.storageRevision());
return completedFuture(null);
}
@@ -795,15 +784,85 @@ public class DistributionZoneManager implements
IgniteComponent {
}
/**
- * Method updates data nodes value for the specified zone, also sets
{@code revision} to the
- * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}
- * and {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it
passes the condition.
+ * Creates or restores zone's state depending on the {@link
ZoneState#topologyAugmentationMap()} existence in the Vault.
+ * We save {@link ZoneState#topologyAugmentationMap()} in the Vault every
time we receive logical topology changes from the metastore.
+ *
+ * @param zone Zone's view.
+ * @param revision Revision for which we restore zone's state.
+ */
+ private void createOrRestoreZoneState(DistributionZoneView zone, long
revision) {
+ int zoneId = zone.zoneId();
+
+ VaultEntry topologyAugmentationMapFromVault =
vaultMgr.get(zoneTopologyAugmentationVault(zoneId)).join();
+
+ // First creation of a zone, or first call on the manager start for
the default zone.
+ if (topologyAugmentationMapFromVault == null) {
+ ZoneState zoneState = new ZoneState(executor);
+
+ ZoneState prevZoneState = zonesState.putIfAbsent(zoneId,
zoneState);
+
+ assert prevZoneState == null : "Zone's state was created twice
[zoneId = " + zoneId + ']';
+
+ Set<Node> dataNodes =
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+
+ initDataNodesAndTriggerKeysInMetaStorage(zoneId, revision,
dataNodes);
+ } else {
+ // Restart case, when topologyAugmentationMap has already been
saved during a cluster work.
+ ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap
= fromBytes(topologyAugmentationMapFromVault.value());
+
+ ZoneState zoneState = new ZoneState(executor,
topologyAugmentationMap);
+
+ VaultEntry dataNodes =
vaultMgr.get(zoneDataNodesKey(zoneId)).join();
+
+ if (dataNodes != null) {
+ String filter = zone.filter();
+
+
zoneState.nodes(filterDataNodes(DistributionZonesUtil.dataNodes(fromBytes(dataNodes.value())),
filter, nodesAttributes()));
+ }
+
+ ZoneState prevZoneState = zonesState.putIfAbsent(zoneId,
zoneState);
+
+ assert prevZoneState == null : "Zone's state was created twice
[zoneId = " + zoneId + ']';
+
+ Optional<Long> maxScaleUpRevision =
zoneState.highestRevision(true);
+
+ // Take the highest revision from the topologyAugmentationMap and
schedule scale up/scale down,
+ // meaning that all augmentation of nodes will be taken into
account in newly created timers. If the augmentation has already
+ // been proposed to data nodes in the metastorage before restart,
+ // that means we have updated corresponding trigger key and it's
value will be greater than
+ // the highest revision from the topologyAugmentationMap, and
current timer won't affect data nodes.
+ maxScaleUpRevision.ifPresent(
+ rev -> zoneState.rescheduleScaleUp(
+ zone.dataNodesAutoAdjustScaleUp(),
+ () -> saveDataNodesToMetaStorageOnScaleUp(zoneId,
rev)
+ )
+ );
+
+ Optional<Long> maxScaleDownRevision =
zoneState.highestRevision(false);
+
+ maxScaleDownRevision.ifPresent(
+ rev -> zoneState.rescheduleScaleDown(
+ zone.dataNodesAutoAdjustScaleDown(),
+ () ->
saveDataNodesToMetaStorageOnScaleDown(zoneId, rev)
+ )
+ );
+ }
+ }
+
+ /**
+ * Method initialise data nodes value for the specified zone, also sets
{@code revision} to the
+ * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
+ * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it passes
the condition. It is called on the first creation of a zone.
*
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
* @param dataNodes Data nodes.
*/
- private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId,
long revision, Set<Node> dataNodes) {
+ private void initDataNodesAndTriggerKeysInMetaStorage(
+ int zoneId,
+ long revision,
+ Set<Node> dataNodes
+ ) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -819,11 +878,22 @@ public class DistributionZoneManager implements
IgniteComponent {
metaStorageManager.invoke(iif).whenComplete((res, e) -> {
if (e != null) {
- LOG.error("Failed to update zones' dataNodes value [zoneId
= {}]", e, zoneId);
+ LOG.error(
+ "Failed to update zones' dataNodes value [zoneId =
{}, dataNodes = {}, revision = {}]",
+ e,
+ zoneId,
+ dataNodes,
+ revision
+ );
} else if (res.getAsBoolean()) {
- LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, dataNodes);
+ LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}, revision = {}]", zoneId, dataNodes, revision);
} else {
- LOG.debug("Failed to update zones' dataNodes value [zoneId
= {}]", zoneId);
+ LOG.debug(
+ "Failed to update zones' dataNodes value [zoneId =
{}, dataNodes = {}, revision = {}]",
+ zoneId,
+ dataNodes,
+ revision
+ );
}
});
} finally {
@@ -849,11 +919,18 @@ public class DistributionZoneManager implements
IgniteComponent {
Iif iif = iif(triggerKeyCondition, removeKeysUpd,
ops().yield(false));
- metaStorageManager.invoke(iif).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug("Delete zones' dataNodes key [zoneId = {}",
zoneId);
+ metaStorageManager.invoke(iif).whenComplete((res, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Failed to delete zone's dataNodes keys [zoneId =
{}, revision = {}]",
+ e,
+ zoneId,
+ revision
+ );
+ } else if (res.getAsBoolean()) {
+ LOG.debug("Delete zone's dataNodes keys [zoneId = {},
revision = {}]", zoneId, revision);
} else {
- LOG.debug("Failed to delete zones' dataNodes key [zoneId =
{}]", zoneId);
+ LOG.debug("Failed to delete zone's dataNodes keys [zoneId
= {}, revision = {}]", zoneId, revision);
}
});
} finally {
@@ -967,37 +1044,6 @@ public class DistributionZoneManager implements
IgniteComponent {
}
}
- /**
- * Initialises data nodes of distribution zones in meta storage
- * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
- */
- // TODO: will be refactored in
https://issues.apache.org/jira/browse/IGNITE-19580
- private void initDataNodesFromVaultManager() {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- long appliedRevision = metaStorageManager.appliedRevision();
-
- if (!logicalTopology.isEmpty()) {
- // init keys and data nodes for default zone
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- DEFAULT_ZONE_ID,
- appliedRevision,
-
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
- );
- }
-
- zonesState.values().forEach(zoneState -> {
-
zoneState.nodes(logicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet()));
- });
-
- } finally {
- busyLock.leaveBusy();
- }
- }
-
/**
* Restores from vault logical topology and nodes' attributes fields in
{@link DistributionZoneManager} after restart.
*/
@@ -1092,17 +1138,27 @@ public class DistributionZoneManager implements
IgniteComponent {
NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
zonesConfiguration.distributionZones();
+ Set<Integer> zoneIds = new HashSet<>();
+
for (int i = 0; i < zones.value().size(); i++) {
DistributionZoneView zoneView = zones.value().get(i);
scheduleTimers(zoneView, addedNodes, removedNodes,
revision);
+
+ zoneIds.add(zoneView.zoneId());
}
DistributionZoneView defaultZoneView =
zonesConfiguration.value().defaultDistributionZone();
scheduleTimers(defaultZoneView, addedNodes, removedNodes,
revision);
-
updateLogicalTopologyNodeAttributesAndSaveToVault(newLogicalTopology, revision);
+ zoneIds.add(defaultZoneView.zoneId());
+
+ newLogicalTopology.forEach(n ->
nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
+ logicalTopology = newLogicalTopology;
+
+ saveStatesToVault(newLogicalTopology, revision, zoneIds);
return completedFuture(null);
} finally {
@@ -1118,23 +1174,24 @@ public class DistributionZoneManager implements
IgniteComponent {
}
/**
- * Updates local representation of logical topology and nodes' attributes
map and saves it to vault atomically in one batch.
+ * Saves states of the Distribution Zone Manager to vault atomically in
one batch.
* After restart it could be used to restore these fields.
*
* @param newLogicalTopology Logical topology.
* @param revision Revision of the event, which triggers update.
+ * @param zoneIds Set of zone id's, whose states will be staved in the
Vault
*/
- private void
updateLogicalTopologyNodeAttributesAndSaveToVault(Set<NodeWithAttributes>
newLogicalTopology, long revision) {
- newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(),
n.nodeAttributes()));
-
- logicalTopology = newLogicalTopology;
-
- Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(2);
+ private void saveStatesToVault(Set<NodeWithAttributes> newLogicalTopology,
long revision, Set<Integer> zoneIds) {
+ Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(3 +
zoneIds.size());
batch.put(zonesLogicalTopologyVault(), toBytes(newLogicalTopology));
batch.put(zonesNodesAttributesVault(), toBytes(nodesAttributes()));
+ for (Integer zoneId : zoneIds) {
+ batch.put(zoneTopologyAugmentationVault(zoneId),
toBytes(zonesState.get(zoneId).topologyAugmentationMap));
+ }
+
batch.put(zonesGlobalStateRevision(), longToBytes(revision));
vaultMgr.putAll(batch).join();
@@ -1343,6 +1400,14 @@ public class DistributionZoneManager implements
IgniteComponent {
long scaleDownTriggerRevision =
extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId)));
if (revision <= scaleUpTriggerRevision) {
+ LOG.debug(
+ "Revision of the event is less than the scale up
revision from the metastorage "
+ + "[zoneId = {}, revision = {},
scaleUpTriggerRevision = {}]",
+ zoneId,
+ revision,
+ scaleUpTriggerRevision
+ );
+
return completedFuture(null);
}
@@ -1373,9 +1438,21 @@ public class DistributionZoneManager implements
IgniteComponent {
if (invokeResult) {
// TODO:
https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
// Currently we call clean up only on a node
that successfully writes data nodes.
+ LOG.debug(
+ "Updating data nodes for a zone after
scale up has succeeded "
+ + "[zoneId = {}, dataNodes =
{}, revision = {}]",
+ zoneId,
+ newDataNodes,
+ revision
+ );
zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
} else {
- LOG.debug("Updating data nodes for a zone has
not succeeded [zoneId = {}]", zoneId);
+ LOG.debug("Updating data nodes for a zone
after scale up has not succeeded "
+ + "[zoneId = {}, dataNodes =
{}, revision = {}]",
+ zoneId,
+ newDataNodes,
+ revision
+ );
return
saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
}
@@ -1384,7 +1461,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}));
})).whenComplete((v, e) -> {
if (e != null) {
- LOG.warn("Failed to update zones' dataNodes value [zoneId
= {}]", e, zoneId);
+ LOG.warn("Failed to update zones' dataNodes value after
scale up [zoneId = {}, revision = {}]", e, zoneId, revision);
}
});
} finally {
@@ -1432,6 +1509,14 @@ public class DistributionZoneManager implements
IgniteComponent {
long scaleDownTriggerRevision =
extractChangeTriggerRevision(values.get(zoneScaleDownChangeTriggerKey(zoneId)));
if (revision <= scaleDownTriggerRevision) {
+ LOG.debug(
+ "Revision of the event is less than the scale down
revision from the metastorage "
+ + "[zoneId = {}, revision = {},
scaleUpTriggerRevision = {}]",
+ zoneId,
+ revision,
+ scaleDownTriggerRevision
+ );
+
return completedFuture(null);
}
@@ -1456,11 +1541,24 @@ public class DistributionZoneManager implements
IgniteComponent {
.thenApply(StatementResult::getAsBoolean)
.thenCompose(invokeResult -> inBusyLock(busyLock, ()
-> {
if (invokeResult) {
+ LOG.debug(
+ "Updating data nodes for a zone after
scale down has succeeded "
+ + "[zoneId = {}, dataNodes =
{}, revision = {}]",
+ zoneId,
+ newDataNodes,
+ revision
+ );
+
// TODO:
https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
// Currently we call clean up only on a node
that successfully writes data nodes.
zoneState.cleanUp(Math.min(scaleUpTriggerRevision, revision));
} else {
- LOG.debug("Updating data nodes for a zone has
not succeeded [zoneId = {}]", zoneId);
+ LOG.debug("Updating data nodes for a zone
after scale down has not succeeded "
+ + "[zoneId = {}, dataNodes =
{}, revision = {}]",
+ zoneId,
+ newDataNodes,
+ revision
+ );
return
saveDataNodesToMetaStorageOnScaleDown(zoneId, revision);
}
@@ -1469,7 +1567,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}));
})).whenComplete((v, e) -> {
if (e != null) {
- LOG.warn("Failed to update zones' dataNodes value [zoneId
= {}]", e, zoneId);
+ LOG.warn("Failed to update zones' dataNodes value after
scale down [zoneId = {}]", e, zoneId);
}
});
} finally {
@@ -1599,7 +1697,7 @@ public class DistributionZoneManager implements
IgniteComponent {
* States are needed to track nodes that we want to add or remove from the
data nodes,
* to schedule and stop scale up and scale down processes.
*/
- static class ZoneState {
+ public static class ZoneState {
/** Schedule task for a scale up process. */
private ScheduledFuture<?> scaleUpTask;
@@ -1636,6 +1734,21 @@ public class DistributionZoneManager implements
IgniteComponent {
nodes = emptySet();
}
+ /**
+ * Constructor.
+ *
+ * @param executor Executor for scheduling tasks for scale up and
scale down processes.
+ * @param topologyAugmentationMap Map that stores pairs revision ->
{@link Augmentation} for a zone. With this map we can
+ * track which nodes should be added or removed in the
processes of scale up or scale down. Revision helps to track
+ * visibility of the events of adding or removing nodes
because any process of scale up or scale down has a revision that
+ * triggered this process.
+ */
+ ZoneState(ScheduledExecutorService executor,
ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap) {
+ this.executor = executor;
+ this.topologyAugmentationMap = topologyAugmentationMap;
+ nodes = emptySet();
+ }
+
/**
* Map that stores pairs revision -> {@link Augmentation} for a zone.
With this map we can track which nodes
* should be added or removed in the processes of scale up or scale
down. Revision helps to track visibility of the events
@@ -1817,12 +1930,12 @@ public class DistributionZoneManager implements
IgniteComponent {
}
@TestOnly
- synchronized ScheduledFuture<?> scaleUpTask() {
+ public synchronized ScheduledFuture<?> scaleUpTask() {
return scaleUpTask;
}
@TestOnly
- synchronized ScheduledFuture<?> scaleDownTask() {
+ public synchronized ScheduledFuture<?> scaleDownTask() {
return scaleDownTask;
}
}
@@ -1831,7 +1944,9 @@ public class DistributionZoneManager implements
IgniteComponent {
* Class stores the info about nodes that should be added or removed from
the data nodes of a zone.
* With flag {@code addition} we can track whether {@code nodeNames}
should be added or removed.
*/
- private static class Augmentation {
+ private static class Augmentation implements Serializable {
+ private static final long serialVersionUID = -7957428671075739621L;
+
/** Names of the node. */
Set<Node> nodes;
@@ -1855,7 +1970,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}
@TestOnly
- Map<Integer, ZoneState> zonesState() {
+ public Map<Integer, ZoneState> zonesState() {
return zonesState;
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 32d3571683..7f87409174 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.metastorage.Entry;
@@ -95,6 +96,9 @@ public class DistributionZonesUtil {
/** Key prefix, needed for processing the event about zone's update was
triggered only once. */
private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX =
"distributionZones.change.trigger.";
+ /** Key prefix that represents {@link ZoneState#topologyAugmentationMap()}
in the Vault.*/
+ private static final String
DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_VAULT_PREFIX =
"vault.distributionZones.topologyAugmentation.";
+
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
@@ -105,7 +109,7 @@ public class DistributionZonesUtil {
private static final ByteArray
DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT_KEY = new
ByteArray(DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT}. */
- public static final ByteArray
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
+ private static final ByteArray
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
new ByteArray(DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
@@ -114,9 +118,11 @@ public class DistributionZonesUtil {
/**
* The initial value of trigger revision in case when it is not
initialized in the meta storage.
- * The trigger revision in the meta storage can be uninitialized for the
default distribution zone.
+ * It is possible because invoke to metastorage with the initialisation is
async, and scale up/down propagation could be
+ * propagated first. Initial value is -1, because for default zone, we
initialise trigger keys with metastorage's applied revision,
+ * which is 0 on a start.
*/
- private static final long INITIAL_TRIGGER_REVISION_VALUE = 0;
+ private static final long INITIAL_TRIGGER_REVISION_VALUE = -1;
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
@@ -202,6 +208,13 @@ public class DistributionZonesUtil {
return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
}
+ /**
+ * The key prefix needed for processing an event about zone's data nodes.
+ */
+ static ByteArray zonesDataNodesPrefix() {
+ return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+ }
+
/**
* The key that represents logical topology nodes in vault.
*/
@@ -225,10 +238,10 @@ public class DistributionZonesUtil {
}
/**
- * The key prefix needed for processing an event about zone's data nodes.
+ * The key that represents {@link ZoneState#topologyAugmentationMap()} in
the Vault.
*/
- static ByteArray zonesDataNodesPrefix() {
- return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+ static ByteArray zoneTopologyAugmentationVault(int zoneId) {
+ return new
ByteArray(DISTRIBUTION_ZONES_TOPOLOGY_AUGMENTATION_VAULT_PREFIX + zoneId);
}
/**
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 53f853cad0..af7e98a271 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -470,7 +470,7 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
assertDataNodesForZone(ZONE_1_ID, Set.of(), keyValueStorage);
- assertZoneScaleUpChangeTriggerKey(2L, ZONE_1_ID, keyValueStorage);
+ assertZoneScaleUpChangeTriggerKey(3L, ZONE_1_ID, keyValueStorage);
doAnswer(invocation -> {
If iif = invocation.getArgument(0);
@@ -516,7 +516,7 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
- assertZoneScaleDownChangeTriggerKey(4L, ZONE_1_ID, keyValueStorage);
+ assertZoneScaleDownChangeTriggerKey(5L, ZONE_1_ID, keyValueStorage);
doAnswer(invocation -> {
If iif = invocation.getArgument(0);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 6354ece930..d84c675aa4 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -61,7 +61,7 @@ import org.apache.ignite.lang.IgniteInternalException;
/**
* Class containing some common logic for Meta Storage Raft group listeners.
*/
-class MetaStorageWriteHandler {
+public class MetaStorageWriteHandler {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(MetaStorageWriteHandler.class);
@@ -176,7 +176,7 @@ class MetaStorageWriteHandler {
}
}
- private static If toIf(Iif iif) {
+ public static If toIf(Iif iif) {
return new If(toCondition(iif.condition()),
toConditionBranch(iif.andThen()), toConditionBranch(iif.orElse()));
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 81e210c9b9..502af7de66 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.distribution.zones;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
@@ -30,18 +32,22 @@ import static
org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -59,12 +65,15 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.If;
import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -77,6 +86,9 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Tests for checking {@link DistributionZoneManager} behavior after node's
restart.
@@ -101,6 +113,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
private static final String ZONE_NAME = "zone1";
+ private static final int ZONE_ID = 1;
+
/**
* Start some of Ignite components that are able to serve as Ignite node
for test purposes.
*
@@ -159,10 +173,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
when(cmgManager.logicalTopology()).thenAnswer(invocation ->
completedFuture(logicalTopology.getLogicalTopology()));
- var metaStorageMgr = StandaloneMetaStorageManager.create(
+ var metaStorageMgr = spy(StandaloneMetaStorageManager.create(
vault,
- new TestRocksDbKeyValueStorage("test",
workDir.resolve("metastorage"))
- );
+ new TestRocksDbKeyValueStorage(name,
workDir.resolve("metastorage"))
+ ));
var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vault);
@@ -340,4 +354,168 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertEquals(scaleUpChangeTriggerRevision, globalStateRevision);
}
+
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void testLocalDataNodesAreRestoredAfterRestart(int zoneId, String
zoneName) throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+ partialNode.logicalTopology().removeNodes(Set.of(B));
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId,
Set.of(A.name()), TIMEOUT_MILLIS);
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId,
Set.of(A.name()), TIMEOUT_MILLIS);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void testScaleUpTimerIsRestoredAfterRestart(int zoneId, String
zoneName) throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(A,
B).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+ when(metaStorageManager.invoke(argThat(iif -> {
+ If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+ byte[] keyScaleUp = zoneScaleUpChangeTriggerKey(zoneId).bytes();
+
+ return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleUp, op.key()));
+ }))).thenThrow(new RuntimeException("Expected"));
+
+ partialNode.logicalTopology().putNode(C);
+ partialNode.logicalTopology().removeNodes(Set.of(B));
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String
zoneName) throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+
+ when(metaStorageManager.invoke(argThat(iif -> {
+ If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+ byte[] keyScaleDown =
zoneScaleDownChangeTriggerKey(zoneId).bytes();
+
+ return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleDown, op.key()));
+ }))).thenThrow(new RuntimeException("Expected"));
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+ partialNode.logicalTopology().removeNodes(Set.of(B));
+ partialNode.logicalTopology().putNode(C);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ private static Stream<Arguments> provideArgumentsRestartTests() {
+ List<Arguments> args = new ArrayList<>();
+
+ args.add(Arguments.of(DEFAULT_ZONE_ID, DEFAULT_ZONE_NAME));
+ args.add(Arguments.of(ZONE_ID, ZONE_NAME));
+
+ return args.stream();
+ }
+
+ private static void createZoneOrAlterDefaultZone(
+ DistributionZoneManager distributionZoneManager,
+ String zoneName
+ ) throws Exception {
+ if (zoneName.equals(DEFAULT_ZONE_NAME)) {
+ distributionZoneManager.alterZone(
+ DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(0)
+ .dataNodesAutoAdjustScaleDown(0)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ ZoneState zoneState =
distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID);
+
+ // This is needed because we want to wait for the end of scale
up/down triggered by altering delays.
+ if (zoneState.scaleUpTask() != null) {
+ assertTrue(waitForCondition(() ->
zoneState.scaleUpTask().isDone(), TIMEOUT_MILLIS));
+ }
+
+ if (zoneState.scaleDownTask() != null) {
+ assertTrue(waitForCondition(() ->
zoneState.scaleDownTask().isDone(), TIMEOUT_MILLIS));
+ }
+ } else {
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(0)
+ .dataNodesAutoAdjustScaleDown(0)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+ }
}