This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 54da917864 IGNITE-18121 Scale up scheduler added (#1508)
54da917864 is described below
commit 54da917864cfc896e960f6de85dbf8f01f336388
Author: Mirza Aliev <[email protected]>
AuthorDate: Thu Jan 26 16:16:23 2023 +0400
IGNITE-18121 Scale up scheduler added (#1508)
---
.../distributionzones/DistributionZoneManager.java | 552 ++++++++++++---
.../distributionzones/DistributionZonesUtil.java | 110 ++-
...ibutionZoneManagerConfigurationChangesTest.java | 181 ++---
...butionZoneManagerLogicalTopologyEventsTest.java | 3 +-
.../DistributionZoneManagerScaleUpTest.java | 745 +++++++++++++++++++++
.../DistributionZoneManagerTest.java | 3 +-
.../DistributionZoneManagerWatchListenerTest.java | 107 ++-
.../DistributionZonesSchedulersTest.java | 141 ++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../DdlCommandHandlerExceptionHandlingTest.java | 3 +-
10 files changed, 1623 insertions(+), 225 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 7adb3370a3..0855973f9f 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -20,28 +20,45 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyConditionForZonesChanges;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerScaleUpScaleDownKeysCondition;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndScaleUpTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.configuration.NamedListChange;
@@ -69,18 +86,22 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.If;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.schema.configuration.TableChange;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.TestOnly;
/**
* Distribution zones manager.
@@ -89,6 +110,8 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Name of the default distribution zone. */
public static final String DEFAULT_ZONE_NAME = "Default";
+ private static final String DISTRIBUTION_ZONE_MANAGER_POOL_NAME =
"dst-zones-scheduler";
+
/** Id of the default distribution zone. */
public static final int DEFAULT_ZONE_ID = 0;
@@ -116,6 +139,15 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Logical topology service to track topology changes. */
private final LogicalTopologyService logicalTopologyService;
+ /** Executor for scheduling tasks for scale up and scale down processes. */
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Map with states for distribution zones. States are needed to track
nodes that we want to add or remove from the data nodes,
+ * schedule and stop scale up and scale down processes.
+ */
+ private final Map<Integer, ZoneState> zonesState;
+
/** Listener for a topology events. */
private final LogicalTopologyEventListener topologyEventListener = new
LogicalTopologyEventListener() {
@Override
@@ -134,8 +166,9 @@ public class DistributionZoneManager implements
IgniteComponent {
}
};
- /** The logical topology on the last watch event.
- * It's enough to mark this field by volatile because we don't update the
collection after it is assigned to the field.
+ /**
+ * The logical topology on the last watch event.
+ * It's enough to mark this field by volatile because we don't update the
collection after it is assigned to the field.
*/
private volatile Set<String> logicalTopology;
@@ -156,7 +189,8 @@ public class DistributionZoneManager implements
IgniteComponent {
TablesConfiguration tablesConfiguration,
MetaStorageManager metaStorageManager,
LogicalTopologyService logicalTopologyService,
- VaultManager vaultMgr
+ VaultManager vaultMgr,
+ String nodeName
) {
this.zonesConfiguration = zonesConfiguration;
this.tablesConfiguration = tablesConfiguration;
@@ -166,7 +200,20 @@ public class DistributionZoneManager implements
IgniteComponent {
this.watchListener = createMetastorageListener();
+ zonesState = new ConcurrentHashMap<>();
+
logicalTopology = Collections.emptySet();
+
+ executor = new ScheduledThreadPoolExecutor(
+ Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName,
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
+ new ThreadPoolExecutor.DiscardPolicy()
+ );
+ }
+
+ @TestOnly
+ Map<Integer, ZoneState> zonesTimers() {
+ return zonesState;
}
/**
@@ -435,13 +482,23 @@ public class DistributionZoneManager implements
IgniteComponent {
zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
+ // Init timers after restart.
+ zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
+
+ zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ int zoneId =
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+ zonesState.putIfAbsent(zoneId, new
ZoneState(executor));
+ });
+
logicalTopologyService.addEventListener(topologyEventListener);
metaStorageManager.registerExactWatch(zonesLogicalTopologyKey(),
watchListener);
initDataNodesFromVaultManager();
- initMetaStorageKeysOnStart();
+ initLogicalTopologyAndVersionInMetaStorageOnStart();
} finally {
busyLock.leaveBusy();
}
@@ -459,41 +516,77 @@ public class DistributionZoneManager implements
IgniteComponent {
logicalTopologyService.removeEventListener(topologyEventListener);
metaStorageManager.unregisterWatch(watchListener);
+
+ shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
}
private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
@Override
public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
- updateMetaStorageOnZoneCreate(ctx.newValue().zoneId(),
ctx.storageRevision());
+ int zoneId = ctx.newValue().zoneId();
+
+ ZoneState zoneState = new ZoneState(executor);
+
+ zonesState.putIfAbsent(zoneId, zoneState);
+
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(zoneId,
ctx.storageRevision(), toBytes(logicalTopology));
return completedFuture(null);
}
@Override
public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
- updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(),
ctx.storageRevision());
+ int zoneId = ctx.oldValue().zoneId();
+
+ zonesState.get(zoneId).stopTimers();
+
+ removeTriggerKeysAndDataNodes(zoneId, ctx.storageRevision());
+
+ zonesState.remove(zoneId);
return completedFuture(null);
}
@Override
public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
- updateMetaStorageOnZoneUpdate(ctx.storageRevision());
+ int zoneId = ctx.newValue().zoneId();
- //TODO: Also add here rescheduling for the existing timers
https://issues.apache.org/jira/browse/IGNITE-18121
+ int oldScaleUp;
+
+ // ctx.oldValue() could be null for the default zone on a first
start.
+ if (ctx.oldValue() == null) {
+ oldScaleUp = Integer.MAX_VALUE;
+ } else {
+ oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
+ }
+
+ int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
+
+ if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
+ // It is safe to zonesTimers.get(zoneId) in term of NPE
because meta storage notifications are one-threaded
+ zonesState.get(zoneId).rescheduleScaleUp(
+ newScaleUp,
+ () -> CompletableFuture.supplyAsync(
+ () ->
saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision()),
+ Runnable::run
+ )
+ );
+ }
return completedFuture(null);
}
}
/**
- * Method updates data nodes value for the specified zone,
- * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ * Method updates data nodes value for the specified zone, also sets
{@code revision} to the
+ * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)}
+ * and {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it
passes the condition.
*
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
+ * @param dataNodes Data nodes.
*/
- private void updateMetaStorageOnZoneCreate(int zoneId, long revision) {
+ private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId,
long revision, byte[] dataNodes) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -501,20 +594,15 @@ public class DistributionZoneManager implements
IgniteComponent {
try {
// Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
// so we do not react on a stale events
- CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
-
- // logicalTopology can be updated concurrently by the watch
listener.
- Set<String> logicalTopology0 = logicalTopology;
+ CompoundCondition triggerKeyCondition =
triggerKeyConditionForZonesChanges(revision, zoneId);
- byte[] logicalTopologyBytes = toBytes(logicalTopology0);
-
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKeys(zoneId, revision, dataNodes);
If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
metaStorageManager.invoke(iif).thenAccept(res -> {
if (res.getAsBoolean()) {
- LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, logicalTopology0);
+ LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, dataNodes);
} else {
LOG.debug("Failed to update zones' dataNodes value [zoneId
= {}]", zoneId);
}
@@ -525,52 +613,22 @@ public class DistributionZoneManager implements
IgniteComponent {
}
/**
- * Sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
- *
- * @param revision Revision of an event that has triggered this method.
- */
- private void updateMetaStorageOnZoneUpdate(long revision) {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
-
- Update triggerKeyUpd = updateTriggerKey(revision);
-
- If iif = If.iif(triggerKeyCondition, triggerKeyUpd,
ops().yield(false));
-
- metaStorageManager.invoke(iif).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug("Distribution zones' trigger key was updated
with the revision [revision = {}]", revision);
- } else {
- LOG.debug("Failed to update distribution zones' trigger
key with the revision [revision = {}]", revision);
- }
- });
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * Method deletes data nodes value for the specified zone,
- * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ * Method deletes data nodes value for the specified zone.
*
* @param zoneId Unique id of a zone
* @param revision Revision of an event that has triggered this method.
*/
- private void updateMetaStorageOnZoneDelete(int zoneId, long revision) {
+ private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
try {
- CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
+ CompoundCondition triggerKeyCondition =
triggerKeyConditionForZonesChanges(revision, zoneId);
- Update dataNodesRemoveUpd =
deleteDataNodesKeyAndUpdateTriggerKey(zoneId, revision);
+ Update removeKeysUpd = deleteDataNodesAndUpdateTriggerKeys(zoneId,
revision);
- If iif = If.iif(triggerKeyCondition, dataNodesRemoveUpd,
ops().yield(false));
+ If iif = If.iif(triggerKeyCondition, removeKeysUpd,
ops().yield(false));
metaStorageManager.invoke(iif).thenAccept(res -> {
if (res.getAsBoolean()) {
@@ -624,7 +682,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- Set<String> topologyFromCmg =
newTopology.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+ Set<String> topologyFromCmg =
newTopology.nodes().stream().map(ClusterNode::name).collect(toSet());
Condition updateCondition;
@@ -665,7 +723,7 @@ public class DistributionZoneManager implements
IgniteComponent {
* Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
* {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from
meta storage on the start of {@link DistributionZoneManager}.
*/
- private void initMetaStorageKeysOnStart() {
+ private void initLogicalTopologyAndVersionInMetaStorageOnStart() {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -688,7 +746,7 @@ public class DistributionZoneManager implements
IgniteComponent {
byte[] topVerFromMetaStorage = topVerEntry.value();
if (topVerFromMetaStorage == null ||
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
- Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+ Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(toSet());
Condition topologyVersionCondition =
topVerFromMetaStorage == null
?
notExists(zonesLogicalTopologyVersionKey()) :
@@ -751,16 +809,25 @@ public class DistributionZoneManager implements
IgniteComponent {
try {
if (vaultEntry != null && vaultEntry.value() !=
null) {
- logicalTopology =
ByteUtils.fromBytes(vaultEntry.value());
+ logicalTopology =
fromBytes(vaultEntry.value());
+
+ // init keys and data nodes for default zone
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ DEFAULT_ZONE_ID,
+ appliedRevision,
+ vaultEntry.value()
+ );
zonesConfiguration.distributionZones().value().namedListKeys()
.forEach(zoneName -> {
int zoneId =
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
- saveDataNodesToMetaStorage(zoneId,
vaultEntry.value(), appliedRevision);
+
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ zoneId,
+ appliedRevision,
+ vaultEntry.value()
+ );
});
-
- saveDataNodesToMetaStorage(DEFAULT_ZONE_ID,
vaultEntry.value(), appliedRevision);
}
} finally {
busyLock.leaveBusy();
@@ -790,13 +857,13 @@ public class DistributionZoneManager implements
IgniteComponent {
byte[] newLogicalTopologyBytes = newEntry.value();
- Set<String> newLogicalTopology =
ByteUtils.fromBytes(newLogicalTopologyBytes);
+ Set<String> newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
- List<String> removedNodes =
- logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toList());
+ Set<String> removedNodes =
+ logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toSet());
- List<String> addedNodes =
- newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toList());
+ Set<String> addedNodes =
+ newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toSet());
logicalTopology = newLogicalTopology;
@@ -806,12 +873,12 @@ public class DistributionZoneManager implements
IgniteComponent {
for (int i = 0; i < zones.value().size(); i++) {
DistributionZoneView zoneView = zones.value().get(i);
- scheduleTimers(zoneView, addedNodes, removedNodes,
newLogicalTopologyBytes, revision);
+ scheduleTimers(zoneView, addedNodes, removedNodes,
revision);
}
DistributionZoneView defaultZoneView =
zonesConfiguration.value().defaultDistributionZone();
- scheduleTimers(defaultZoneView, addedNodes, removedNodes,
newLogicalTopologyBytes, revision);
+ scheduleTimers(defaultZoneView, addedNodes, removedNodes,
revision);
} finally {
busyLock.leaveBusy();
}
@@ -824,11 +891,44 @@ public class DistributionZoneManager implements
IgniteComponent {
};
}
+ /**
+ * Schedules scale up and scale down timers.
+ *
+ * @param zoneCfg Zone's configuration.
+ * @param addedNodes Nodes that was added to a topology and should be
added to zones data nodes.
+ * @param removedNodes Nodes that was removed from a topology and should
be removed from zones data nodes.
+ * @param revision Revision that triggered that event.
+ */
private void scheduleTimers(
DistributionZoneView zoneCfg,
- List<String> addedNodes, List<String> removedNodes,
- byte[] newLogicalTopologyBytes,
+ Set<String> addedNodes,
+ Set<String> removedNodes,
long revision
+ ) {
+ scheduleTimers(
+ zoneCfg,
+ addedNodes,
+ removedNodes,
+ revision,
+ this::saveDataNodesToMetaStorageOnScaleUp
+ );
+ }
+
+ /**
+ * Schedules scale up and scale down timers. This method is needed also
for test purposes.
+ *
+ * @param zoneCfg Zone's configuration.
+ * @param addedNodes Nodes that was added to a topology and should be
added to zones data nodes.
+ * @param removedNodes Nodes that was removed from a topology and should
be removed from zones data nodes.
+ * @param revision Revision that triggered that event.
+ * @param saveDataNodes Function that save nodes to a zone's data nodes.
+ */
+ void scheduleTimers(
+ DistributionZoneView zoneCfg,
+ Set<String> addedNodes,
+ Set<String> removedNodes,
+ long revision,
+ BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodes
) {
int autoAdjust = zoneCfg.dataNodesAutoAdjust();
int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown();
@@ -838,48 +938,28 @@ public class DistributionZoneManager implements
IgniteComponent {
if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust
!= Integer.MAX_VALUE) {
//TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust
timer.
- saveDataNodesToMetaStorage(
- zoneId, newLogicalTopologyBytes, revision
- );
+ throw new UnsupportedOperationException("Data nodes auto adjust is
not supported.");
} else {
if (!addedNodes.isEmpty() && autoAdjustScaleUp !=
Integer.MAX_VALUE) {
//TODO: IGNITE-18121 Create scale up scheduler with
dataNodesAutoAdjustScaleUp timer.
- saveDataNodesToMetaStorage(
- zoneId, newLogicalTopologyBytes, revision
+ zonesState.get(zoneId).addNodesToDataNodes(addedNodes,
revision);
+
+ zonesState.get(zoneId).rescheduleScaleUp(
+ autoAdjustScaleUp,
+ () -> CompletableFuture.supplyAsync(
+ () -> saveDataNodes.apply(zoneId, revision),
+ Runnable::run
+ )
);
}
if (!removedNodes.isEmpty() && autoAdjustScaleDown !=
Integer.MAX_VALUE) {
//TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
- saveDataNodesToMetaStorage(
- zoneId, newLogicalTopologyBytes, revision
- );
+ throw new UnsupportedOperationException("Data nodes auto
adjust scale down is not supported.");
}
}
}
- /**
- * Method updates data nodes value for the specified zone,
- * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
- *
- * @param zoneId Unique id of a zone
- * @param dataNodes Data nodes of a zone
- * @param revision Revision of an event that has triggered this method.
- */
- private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long
revision) {
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
-
- var iif = If.iif(triggerKeyCondition(revision),
dataNodesAndTriggerKeyUpd, ops().yield(false));
-
- metaStorageManager.invoke(iif).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
- } else {
- LOG.debug("Failed to delete zones' dataNodes key [zoneId =
{}]", zoneId);
- }
- });
- }
-
/**
* Unwraps distribution zone exception from {@link
ConfigurationChangeException} if it is possible.
*
@@ -906,4 +986,264 @@ public class DistributionZoneManager implements
IgniteComponent {
return null;
}
+
+ /**
+ * Method updates data nodes value for the specified zone after scale up
timer timeout, sets {@code revision} to the
+ * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
+ * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} if it passes
the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ CompletableFuture<Void> saveDataNodesToMetaStorageOnScaleUp(int zoneId,
long revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (zoneState == null) {
+ // Zone was deleted
+ return completedFuture(null);
+ }
+
+ Set<ByteArray> keysToGetFromMs = Set.of(
+ zoneDataNodesKey(zoneId),
+ zoneScaleUpChangeTriggerKey(zoneId),
+ zoneScaleDownChangeTriggerKey(zoneId)
+ );
+
+ return
metaStorageManager.getAll(keysToGetFromMs).thenCompose(values ->
inBusyLock(busyLock, () -> {
+ if (values.containsValue(null)) {
+ // Zone was deleted
+ return completedFuture(null);
+ }
+
+ Set<String> dataNodesFromMetaStorage =
fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+ long scaleUpTriggerRevision =
bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+ long scaleDownTriggerRevision =
bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+ if (revision <= scaleUpTriggerRevision) {
+ return completedFuture(null);
+ }
+
+ Set<String> deltaToAdd =
zoneState.nodesToAddToDataNodes(scaleUpTriggerRevision,
scaleDownTriggerRevision, revision);
+
+ if (deltaToAdd.isEmpty()) {
+ return completedFuture(null);
+ }
+
+ Set<String> newDataNodes = new
HashSet<>(dataNodesFromMetaStorage);
+
+ newDataNodes.addAll(deltaToAdd);
+
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+ If iif = If.iif(
+
triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision,
scaleDownTriggerRevision, zoneId),
+ dataNodesAndTriggerKeyUpd,
+ ops().yield(false)
+ );
+
+ return metaStorageManager.invoke(iif)
+ .thenApply(StatementResult::getAsBoolean)
+ .thenApply(invokeResult -> inBusyLock(busyLock, () -> {
+ if (invokeResult) {
+
zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
+ } else {
+ LOG.debug("Updating data nodes for a zone has
not succeeded [zoneId = {}]", zoneId);
+
+ return
saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+ }
+
+ return completedFuture(null);
+ }));
+ })).handle((v, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to update zones' dataNodes value [zoneId
= {}]", e, zoneId);
+
+ return CompletableFuture.<Void>failedFuture(e);
+ }
+
+ return CompletableFuture.<Void>completedFuture(null);
+ }).thenCompose(Function.identity());
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Class responsible for storing state for a distribution zone.
+ * States are needed to track nodes that we want to add or remove from the
data nodes,
+ * to schedule and stop scale up and scale down processes.
+ */
+ static class ZoneState {
+ /** Schedule task for a scale up process. */
+ private ScheduledFuture<?> scaleUpTask;
+
+ /** Schedule task for a scale down process. */
+ private ScheduledFuture<?> scaleDownTask;
+
+ /**
+ * Map that stores pairs revision -> {@link Augmentation} for a zone.
With this map we can track which nodes
+ * should be added or removed in the processes of scale up or scale
down. Revision helps to track visibility of the events
+ * of adding or removing nodes because any process of scale up or
scale down has a revision that triggered this process.
+ */
+ private final ConcurrentSkipListMap<Long, Augmentation>
topologyAugmentationMap;
+
+ /** Executor for scheduling tasks for scale up and scale down
processes. */
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Constructor.
+ *
+ * @param executor Executor for scheduling tasks for scale up and
scale down processes.
+ */
+ ZoneState(ScheduledExecutorService executor) {
+ this.executor = executor;
+ topologyAugmentationMap = new ConcurrentSkipListMap<>();
+ }
+
+ /**
+ * Map that stores pairs revision -> {@link Augmentation} for a zone.
With this map we can track which nodes
+ * should be added or removed in the processes of scale up or scale
down. Revision helps to track visibility of the events
+ * of adding or removing nodes because any process of scale up or
scale down has a revision that triggered this process.
+ */
+ ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap() {
+ return topologyAugmentationMap;
+ }
+
+ /**
+ * Reschedules existing scale up task, if it is not started yet, or
schedules new one, if the current task cannot be canceled.
+ *
+ * @param delay Delay to start runnable in seconds.
+ * @param runnable Custom logic to run.
+ */
+ synchronized void rescheduleScaleUp(long delay, Runnable runnable) {
+ if (scaleUpTask != null) {
+ scaleUpTask.cancel(false);
+ }
+
+ scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Reschedules existing scale down task, if it is not started yet, or
schedules new one, if the current task cannot be canceled.
+ *
+ * @param delay Delay to start runnable in seconds.
+ * @param runnable Custom logic to run.
+ */
+ synchronized void rescheduleScaleDown(long delay, Runnable runnable) {
+ //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ if (scaleDownTask != null) {
+ scaleDownTask.cancel(false);
+ }
+
+ scaleDownTask = executor.schedule(runnable, delay,
TimeUnit.SECONDS);
+ }
+
+ /**
+ * Cancels task for scale up and scale down.
+ */
+ synchronized void stopTimers() {
+ if (scaleUpTask != null) {
+ scaleUpTask.cancel(false);
+ }
+
+ if (scaleDownTask != null) {
+ scaleDownTask.cancel(false);
+ }
+ }
+
+ /**
+ * Returns a set of nodes that should be added to zone's data nodes.
+ *
+ * @param scaleUpRevision Last revision of the scale up event.
+ * @param scaleDownRevision Last revision of the scale down event.
+ * @param revision Revision of the event for which this data nodes is
needed.
+ * @return Set of nodes that should be added to zone's data nodes.
+ */
+ Set<String> nodesToAddToDataNodes(long scaleUpRevision, long
scaleDownRevision, long revision) {
+ Long toKey = topologyAugmentationMap.floorKey(revision);
+
+ if (toKey == null) {
+ return Set.of();
+ }
+
+ Set<String> nodesToAdd = accumulateNodes(scaleUpRevision, toKey,
true);
+
+ Set<String> nodesToRemove = accumulateNodes(scaleUpRevision,
Math.max(toKey, scaleDownRevision), false);
+
+ nodesToAdd.removeAll(nodesToRemove);
+
+ return nodesToAdd;
+ }
+
+ public Set<String> nodesToRemoveFromDataNodes() {
+ //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ return null;
+ }
+
+ /**
+ * Add nodes to the map where nodes that must be added to the zone's
data nodes are accumulated.
+ *
+ * @param nodes Nodes to add to zone's data nodes.
+ * @param revision Revision of the event that triggered this addition.
+ */
+ void addNodesToDataNodes(Set<String> nodes, long revision) {
+ topologyAugmentationMap.put(revision, new Augmentation(nodes,
true));
+ }
+
+ /**
+ * Add nodes to the map where nodes that must be removed from the
zone's data nodes are accumulated.
+ *
+ * @param nodes Nodes to remove from zone's data nodes.
+ * @param revision Revision of the event that triggered this addition.
+ */
+ void removeNodesFromDataNodes(Set<String> nodes, long revision) {
+ topologyAugmentationMap.put(revision, new Augmentation(nodes,
true));
+ }
+
+ /**
+ * Accumulate nodes from the {@link ZoneState#topologyAugmentationMap}
starting from the {@code fromKey} (including)
+ * to the {@code toKey} (including), where flag {@code addition}
indicates whether we should accumulate nodes that should be
+ * added to data nodes, or removed.
+ *
+ * @param fromKey Starting key (including).
+ * @param toKey Ending key (including).
+ * @param addition Indicates whether we should accumulate nodes that
should be added to data nodes, or removed.
+ * @return Accumulated nodes.
+ */
+ private Set<String> accumulateNodes(long fromKey, long toKey, boolean
addition) {
+ return topologyAugmentationMap.subMap(fromKey, true, toKey,
true).values()
+ .stream()
+ .filter(a -> a.addition == addition)
+ .flatMap(a -> a.nodeNames.stream())
+ .collect(toSet());
+ }
+
+ private void cleanUp(long toKey) {
+ //TODO: IGNITE-18132 Create scale down scheduler with
dataNodesAutoAdjustScaleDown timer.
+ }
+ }
+
+ /**
+ * Class stores the info about nodes that should be added or removed from
the data nodes of a zone.
+ * With flag {@code addition} we can track whether {@code nodeNames}
should be added or removed.
+ */
+ private static class Augmentation {
+ /** Names of the node. */
+ Set<String> nodeNames;
+
+ /** Flag that indicates whether {@code nodeNames} should be added or
removed. */
+ boolean addition;
+
+ Augmentation(Set<String> nodeNames, boolean addition) {
+ this.nodeNames = nodeNames;
+ this.addition = addition;
+ }
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 3d74a0ea79..335eaa005e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.distributionzones;
+import static org.apache.ignite.internal.metastorage.dsl.CompoundCondition.and;
import static org.apache.ignite.internal.metastorage.dsl.CompoundCondition.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -37,14 +38,20 @@ class DistributionZonesUtil {
/** Key prefix for zone's data nodes. */
private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX =
"distributionZone.dataNodes.";
+ /** Key prefix for zone's scale up change trigger key. */
+ private static final String
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX =
"distributionZone.scaleUp.change.trigger.";
+
+ /** Key prefix for zone's scale down change trigger key. */
+ private static final String
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX =
"distributionZone.scaleDown.change.trigger.";
+
/** Key prefix for zones' logical topology nodes. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
"distributionZones.logicalTopology";
/** Key prefix for zones' logical topology version. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION =
"distributionZones.logicalTopologyVersion";
- /** The key, needed for processing the event about zones' update was
triggered only once. */
- private static final ByteArray DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY = new
ByteArray("distributionZones.change.trigger");
+ /** Key prefix, needed for processing the event about zone's update was
triggered only once. */
+ private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX =
"distributionZones.change.trigger.";
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
@@ -59,10 +66,27 @@ class DistributionZonesUtil {
}
/**
- * The key, needed for processing the event about zones' update was
triggered only once.
+ * The key needed for processing an event about zone's creation and
deletion.
+ * With this key we can be sure that event was triggered only once.
+ */
+ static ByteArray zonesChangeTriggerKey(int zoneId) {
+ return new ByteArray(DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX +
zoneId);
+ }
+
+ /**
+ * The key needed for processing an event about zone's data node
propagation on scale up.
+ * With this key we can be sure that event was triggered only once.
+ */
+ static ByteArray zoneScaleUpChangeTriggerKey(int zoneId) {
+ return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX
+ zoneId);
+ }
+
+ /**
+ * The key needed for processing an event about zone's data node
propagation on scale down.
+ * With this key we can be sure that event was triggered only once.
*/
- static ByteArray zonesChangeTriggerKey() {
- return DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY;
+ static ByteArray zoneScaleDownChangeTriggerKey(int zoneId) {
+ return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX + zoneId);
}
/**
@@ -82,69 +106,97 @@ class DistributionZonesUtil {
}
/**
- * Condition for updating {@link
DistributionZonesUtil#zonesChangeTriggerKey()} key.
+ * Condition for updating {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
* Update only if the revision of the event is newer than value in that
trigger key.
*
* @param revision Event revision.
* @return Update condition.
*/
- static CompoundCondition triggerKeyCondition(long revision) {
+ static CompoundCondition triggerKeyConditionForZonesChanges(long revision,
int zoneId) {
return or(
- notExists(zonesChangeTriggerKey()),
-
value(zonesChangeTriggerKey()).lt(ByteUtils.longToBytes(revision))
+ notExists(zonesChangeTriggerKey(zoneId)),
+
value(zonesChangeTriggerKey(zoneId)).lt(ByteUtils.longToBytes(revision))
);
}
/**
- * Updates data nodes value for a zone and set {@code revision} to {@link
DistributionZonesUtil#zonesChangeTriggerKey()}.
+ * Condition for updating {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
+ * Update only if the revision of the event is newer than value in that
trigger key.
+ *
+ * @param scaleUpTriggerRevision Trigger revision of scale up.
+ * @param scaleDownTriggerRevision Trigger revision of scale down.
+ * @param zoneId Zone id.
+ * @return Update condition.
+ */
+ static CompoundCondition triggerScaleUpScaleDownKeysCondition(long
scaleUpTriggerRevision, long scaleDownTriggerRevision, int zoneId) {
+ return and(
+
value(zoneScaleUpChangeTriggerKey(zoneId)).eq(ByteUtils.longToBytes(scaleUpTriggerRevision)),
+
value(zoneScaleDownChangeTriggerKey(zoneId)).eq(ByteUtils.longToBytes(scaleDownTriggerRevision))
+ );
+ }
+
+ /**
+ * Updates data nodes value for a zone and set {@code revision} to {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}.
*
* @param zoneId Distribution zone id
* @param revision Revision of the event.
- * @param logicalTopology Logical topology.
+ * @param nodes Data nodes.
* @return Update command for the meta storage.
*/
- static Update updateDataNodesAndTriggerKey(int zoneId, long revision,
byte[] logicalTopology) {
+ static Update updateDataNodesAndScaleUpTriggerKey(int zoneId, long
revision, byte[] nodes) {
return ops(
- put(zoneDataNodesKey(zoneId), logicalTopology),
- put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
+ put(zoneDataNodesKey(zoneId), nodes),
+ put(zoneScaleUpChangeTriggerKey(zoneId),
ByteUtils.longToBytes(revision))
).yield(true);
}
/**
- * Updates logical topology and logical topology version values for zones.
+ * Updates data nodes value for a zone and set {@code revision} to {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)},
+ * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
{@link DistributionZonesUtil#zonesChangeTriggerKey(int)}.
*
- * @param logicalTopology Logical topology.
- * @param topologyVersion Logical topology version.
+ * @param zoneId Distribution zone id
+ * @param revision Revision of the event.
+ * @param nodes Data nodes.
* @return Update command for the meta storage.
*/
- static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology,
long topologyVersion) {
+ static Update updateDataNodesAndTriggerKeys(int zoneId, long revision,
byte[] nodes) {
return ops(
- put(zonesLogicalTopologyVersionKey(),
ByteUtils.longToBytes(topologyVersion)),
- put(zonesLogicalTopologyKey(),
ByteUtils.toBytes(logicalTopology))
+ put(zoneDataNodesKey(zoneId), nodes),
+ put(zoneScaleUpChangeTriggerKey(zoneId),
ByteUtils.longToBytes(revision)),
+ put(zoneScaleDownChangeTriggerKey(zoneId),
ByteUtils.longToBytes(revision)),
+ put(zonesChangeTriggerKey(zoneId),
ByteUtils.longToBytes(revision))
).yield(true);
}
/**
- * Sets {@code revision} to {@link
DistributionZonesUtil#zonesChangeTriggerKey()}.
+ * Deletes data nodes, {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)},
+ * {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} values
for a zone. Also sets {@code revision} to
+ * {@link DistributionZonesUtil#zonesChangeTriggerKey(int)}.
*
+ * @param zoneId Distribution zone id
* @param revision Revision of the event.
* @return Update command for the meta storage.
*/
- static Update updateTriggerKey(long revision) {
- return ops(put(zonesChangeTriggerKey(),
ByteUtils.longToBytes(revision))).yield(true);
+ static Update deleteDataNodesAndUpdateTriggerKeys(int zoneId, long
revision) {
+ return ops(
+ remove(zoneDataNodesKey(zoneId)),
+ remove(zoneScaleUpChangeTriggerKey(zoneId)),
+ remove(zoneScaleDownChangeTriggerKey(zoneId)),
+ put(zonesChangeTriggerKey(zoneId),
ByteUtils.longToBytes(revision))
+ ).yield(true);
}
/**
- * Deletes data nodes value for a zone and set {@code revision} to {@link
DistributionZonesUtil#zonesChangeTriggerKey()}.
+ * Updates logical topology and logical topology version values for zones.
*
- * @param zoneId Distribution zone id
- * @param revision Revision of the event.
+ * @param logicalTopology Logical topology.
+ * @param topologyVersion Logical topology version.
* @return Update command for the meta storage.
*/
- static Update deleteDataNodesKeyAndUpdateTriggerKey(int zoneId, long
revision) {
+ static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology,
long topologyVersion) {
return ops(
- remove(zoneDataNodesKey(zoneId)),
- put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
+ put(zonesLogicalTopologyVersionKey(),
ByteUtils.longToBytes(topologyVersion)),
+ put(zonesLogicalTopologyKey(),
ByteUtils.toBytes(logicalTopology))
).yield(true);
}
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index ba1dea44be..d05c7165a2 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
-import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
@@ -41,10 +41,13 @@ import static org.mockito.Mockito.when;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.configuration.NamedListView;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
@@ -52,15 +55,21 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import org.apache.ignite.internal.configuration.ConfigurationManager;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
import org.apache.ignite.internal.metastorage.dsl.If;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -72,6 +81,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -138,7 +148,8 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
tablesConfiguration,
metaStorageManager,
logicalTopologyService,
- vaultMgr
+ vaultMgr,
+ "node"
);
clusterCfgMgr.start();
@@ -200,7 +211,49 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
return res;
}
- ).when(metaStorageService).run(any());
+ ).when(metaStorageService).run(any(WriteCommand.class));
+
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReadCommand command() {
+ return (ReadCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onRead(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(ReadCommand.class));
MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
@@ -211,6 +264,28 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
return metaStorageService.run(multiInvokeCommand).thenApply(bi ->
new StatementResult(((StatementResultInfo) bi).result()));
}).when(metaStorageManager).invoke(any());
+
+ lenient().doAnswer(invocationClose -> {
+ Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+ GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+ ).revision(0).build();
+
+ return metaStorageService.run(getAllCommand).thenApply(bi -> {
+ MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+ Map<ByteArray, Entry> res = new HashMap<>();
+
+ for (SingleEntryResponse e : resp.entries()) {
+ ByteArray key = new ByteArray(e.key());
+
+ res.put(key, new EntryImpl(key.bytes(), e.value(),
e.revision(), e.updateCounter()));
+ }
+
+ return res;
+ });
+ }).when(metaStorageManager).getAll(any());
}
@AfterEach
@@ -230,36 +305,9 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
assertDataNodesForZone(1, nodes);
- assertZonesChangeTriggerKey(1);
- }
+ assertZoneScaleUpChangeTriggerKey(1, 1);
- @Test
- void testTriggerKeyPropagationAfterDefaultZoneUpdate() throws Exception {
- testTriggerKeyPropagationAfterZoneUpdate(DEFAULT_ZONE_NAME);
- }
-
- @Test
- void testTriggerKeyPropagationAfterNotDefaultZoneUpdate() throws Exception
{
- testTriggerKeyPropagationAfterZoneUpdate(ZONE_NAME);
- }
-
- void testTriggerKeyPropagationAfterZoneUpdate(String zoneName) throws
Exception {
-
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
-
- int triggerKey = 0;
-
- if (!DEFAULT_ZONE_NAME.equals(zoneName)) {
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(zoneName).build()).get();
-
- assertZonesChangeTriggerKey(++triggerKey);
- }
-
- distributionZoneManager.alterZone(
- zoneName,
- new
DistributionZoneConfigurationParameters.Builder(zoneName).dataNodesAutoAdjust(100).build()
- ).get();
-
- assertZonesChangeTriggerKey(++triggerKey);
+ assertZonesChangeTriggerKey(1, 1);
}
@Test
@@ -275,74 +323,39 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
@Test
void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
-
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
+
assertNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(1).bytes()).value());
+
assertNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(2).bytes()).value());
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME).build()).get();
- assertZonesChangeTriggerKey(2);
- }
-
- @Test
- void testSeveralZoneUpdatesUpdatesTriggerKey() throws Exception {
-
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
-
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
-
- distributionZoneManager.alterZone(
- ZONE_NAME,
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
- ).get();
-
- distributionZoneManager.alterZone(
- ZONE_NAME,
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(1000).build()
- ).get();
-
- assertZonesChangeTriggerKey(3);
+ assertZoneScaleUpChangeTriggerKey(1, 1);
+ assertZoneScaleUpChangeTriggerKey(2, 2);
+ assertZonesChangeTriggerKey(1, 1);
+ assertZonesChangeTriggerKey(2, 2);
}
@Test
void testDataNodesNotPropagatedAfterZoneCreation() throws Exception {
- keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
+ keyValueStorage.put(zonesChangeTriggerKey(1).bytes(),
longToBytes(100));
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
- assertZonesChangeTriggerKey(100);
+ assertZonesChangeTriggerKey(100, 1);
assertDataNodesForZone(1, null);
}
- @Test
- void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
-
- assertDataNodesForZone(1, nodes);
-
- keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
-
- distributionZoneManager.alterZone(
- ZONE_NAME,
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
- ).get();
-
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
-
- assertZonesChangeTriggerKey(100);
-
- assertDataNodesForZone(1, nodes);
- }
-
@Test
void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
assertDataNodesForZone(1, nodes);
- keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
+ keyValueStorage.put(zonesChangeTriggerKey(1).bytes(),
longToBytes(100));
distributionZoneManager.dropZone(ZONE_NAME).get();
@@ -361,14 +374,22 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
private void assertDataNodesForZone(int zoneId, @Nullable Set<String>
clusterNodes) throws InterruptedException {
byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
- assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(),
nodes),
- 1000));
+ assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(),
nodes), 1000));
+ }
+
+ private void assertZoneScaleUpChangeTriggerKey(int revision, int zoneId)
throws InterruptedException {
+ assertTrue(
+ waitForCondition(
+ () ->
ByteUtils.bytesToLong(keyValueStorage.get(zoneScaleUpChangeTriggerKey(zoneId).bytes()).value())
== revision,
+ 2000
+ )
+ );
}
- private void assertZonesChangeTriggerKey(int revision) throws
InterruptedException {
+ private void assertZonesChangeTriggerKey(int revision, int zoneId) throws
InterruptedException {
assertTrue(
waitForCondition(
- () ->
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value())
== revision, 1000
+ () ->
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey(zoneId).bytes()).value())
== revision, 1000
)
);
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index de897b0708..80f90f754f 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -146,7 +146,8 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
tablesConfiguration,
metaStorageManager,
logicalTopologyService,
- vaultMgr
+ vaultMgr,
+ "node"
);
clusterCfgMgr.start();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
new file mode 100644
index 0000000000..e5cd7a8f13
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetCommand;
+import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.dsl.If;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Test scenarios for the distribution zone scale up.
+ */
+public class DistributionZoneManagerScaleUpTest {
+ private static final String ZONE_NAME = "zone1";
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ @Mock
+ private LogicalTopologyServiceImpl logicalTopologyService;
+
+ private LogicalTopology topology;
+
+ private ClusterStateStorage clusterStateStorage;
+
+ private VaultManager vaultMgr;
+
+ private MetaStorageManager metaStorageManager;
+
+ private WatchListener watchListener;
+
+ private DistributionZonesConfiguration zonesConfiguration;
+
+ @Mock
+ private ClusterManagementGroupManager cmgManager;
+
+ @BeforeEach
+ public void setUp() {
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Set.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ zonesConfiguration = clusterCfgMgr.configurationRegistry()
+ .getConfiguration(DistributionZonesConfiguration.KEY);
+
+ metaStorageManager = mock(MetaStorageManager.class);
+
+ cmgManager = mock(ClusterManagementGroupManager.class);
+
+ clusterStateStorage = new TestClusterStateStorage();
+
+ topology = new LogicalTopologyImpl(clusterStateStorage);
+
+ LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(topology, cmgManager);
+
+ vaultMgr = mock(VaultManager.class);
+
+ TablesConfiguration tablesConfiguration =
mock(TablesConfiguration.class);
+
+ NamedConfigurationTree<TableConfiguration, TableView, TableChange>
tables = mock(NamedConfigurationTree.class);
+
+ when(tablesConfiguration.tables()).thenReturn(tables);
+
+ NamedListView<TableView> value = mock(NamedListView.class);
+
+ when(tables.value()).thenReturn(value);
+
+ when(value.namedListKeys()).thenReturn(new ArrayList<>());
+
+ distributionZoneManager = new DistributionZoneManager(
+ zonesConfiguration,
+ tablesConfiguration,
+ metaStorageManager,
+ logicalTopologyService,
+ vaultMgr,
+ "node"
+ );
+
+ clusterCfgMgr.start();
+
+ mockVaultAppliedRevision(1);
+
+
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+ when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
+
+ doAnswer(invocation -> {
+ watchListener = invocation.getArgument(1);
+
+ return null;
+ }).when(metaStorageManager).registerExactWatch(any(), any());
+
+ AtomicLong raftIndex = new AtomicLong();
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage);
+
+ RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+ // Delegate directly to listener.
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WriteCommand command() {
+ return (WriteCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onWrite(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(WriteCommand.class));
+
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReadCommand command() {
+ return (ReadCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onRead(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(ReadCommand.class));
+
+ MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+
+ lenient().doAnswer(invocationClose -> {
+ If iif = invocationClose.getArgument(0);
+
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif,
commandsFactory)).build();
+
+ return metaStorageService.run(multiInvokeCommand).thenApply(bi ->
new StatementResult(((StatementResultInfo) bi).result()));
+ }).when(metaStorageManager).invoke(any());
+
+ lenient().doAnswer(invocationClose -> {
+ Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+ GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+ ).revision(0).build();
+
+ return metaStorageService.run(getAllCommand).thenApply(bi -> {
+ MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+ Map<ByteArray, Entry> res = new HashMap<>();
+
+ for (SingleEntryResponse e : resp.entries()) {
+ ByteArray key = new ByteArray(e.key());
+
+ res.put(key, new EntryImpl(key.bytes(), e.value(),
e.revision(), e.updateCounter()));
+ }
+
+ return res;
+ });
+ }).when(metaStorageManager).getAll(any());
+
+ lenient().doAnswer(invocationClose -> {
+ ByteArray key = invocationClose.getArgument(0);
+
+ GetCommand getCommand =
commandsFactory.getCommand().key(key.bytes()).build();
+
+ return metaStorageService.run(getCommand).thenApply(bi -> {
+ SingleEntryResponse resp = (SingleEntryResponse) bi;
+
+ return new EntryImpl(resp.key(), resp.value(),
resp.revision(), resp.updateCounter());
+ });
+ }).when(metaStorageManager).get(any());
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ distributionZoneManager.stop();
+
+ clusterCfgMgr.stop();
+
+ keyValueStorage.close();
+
+ clusterStateStorage.destroy();
+ }
+
+ @Test
+ void testDataNodesPropagationAfterScaleUpTriggered() throws Exception {
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager.start();
+
+ assertDataNodesForZone(DEFAULT_ZONE_ID,
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node2);
+
+ Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+ mockCmgLocalNodes(1L, clusterNodes2);
+
+ assertLogicalTopology(clusterNodes2);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(1).build()
+ ).get();
+
+ assertDataNodesForZone(1,
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ assertZoneScaleUpChangeTriggerKey(1, 1);
+
+
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
2);
+
+ assertDataNodesForZone(1,
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+ }
+
+ @Test
+ void testDataNodesPropagationForDefaultZoneAfterScaleUpTriggered() throws
Exception {
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager.start();
+
+ assertDataNodesForZone(DEFAULT_ZONE_ID,
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node2);
+
+ Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+ mockCmgLocalNodes(1L, clusterNodes2);
+
+ assertLogicalTopology(clusterNodes2);
+
+ distributionZoneManager.alterZone(
+ DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+ ).get();
+
+
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
2);
+
+ assertDataNodesForZone(DEFAULT_ZONE_ID,
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+ }
+
+ @Test
+ void testDropZoneDoNotPropagateDataNodesAfterScaleUp() throws Exception {
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager.start();
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node2);
+
+ Set<ClusterNode> clusterNodes2 = Set.of(node1, node2);
+
+ mockCmgLocalNodes(1L, clusterNodes2);
+
+ assertLogicalTopology(clusterNodes2);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(1).build()
+ ).get();
+
+ assertDataNodesForZone(1,
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ assertZoneScaleUpChangeTriggerKey(1, 1);
+
+
watchListenerOnUpdate(topology.getLogicalTopology().nodes().stream().map(ClusterNode::name).collect(Collectors.toSet()),
2);
+
+ distributionZoneManager.dropZone(ZONE_NAME).get();
+
+ assertNotEqualsDataNodesForZone(1,
clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+ }
+
+ @Test
+ void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception {
+ preparePrerequisites();
+
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
+ zonesConfiguration.distributionZones();
+
+ DistributionZoneView zoneView = zones.value().get(0);
+
+ CountDownLatch in1 = new CountDownLatch(1);
+ CountDownLatch in2 = new CountDownLatch(1);
+ CountDownLatch out1 = new CountDownLatch(1);
+ CountDownLatch out2 = new CountDownLatch(1);
+
+ distributionZoneManager.scheduleTimers(
+ zoneView,
+ Set.of("B"),
+ Set.of(),
+ 2,
+ (zoneId, revision) -> {
+ try {
+ in1.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+
+ return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(out1::countDown);
+ }
+ );
+
+ // Assert that first task was run and event about adding node "B" with
revision 2 was added
+ // to the topologyAugmentationMap of the zone.
+ assertTrue(
+ waitForCondition(
+ () ->
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(2L),
+ 1000
+ )
+ );
+
+ distributionZoneManager.scheduleTimers(
+ zoneView,
+ Set.of("C"),
+ Set.of(),
+ 3,
+ (zoneId, revision) -> {
+ try {
+ in2.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+
+ return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(() -> {
+ try {
+ out2.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ });
+ }
+ );
+
+ // Assert that second task was run and event about adding node "C"
with revision 3 was added
+ // to the topologyAugmentationMap of the zone.
+ assertTrue(
+ waitForCondition(
+ () ->
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(3L),
+ 1000
+ )
+ );
+
+
+ //Second task is propagating data nodes first.
+ in2.countDown();
+
+ assertZoneScaleUpChangeTriggerKey(3, 1);
+
+ assertDataNodesForZone(1, Set.of("A", "B", "C"));
+
+ out2.countDown();
+
+ in1.countDown();
+
+ //Waiting for the first scheduler ends it work.
+ out1.countDown();
+
+ // Assert that nothing has been changed.
+ assertZoneScaleUpChangeTriggerKey(3, 1);
+
+ assertDataNodesForZone(1, Set.of("A", "B", "C"));
+ }
+
+ @Test
+ void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception {
+ preparePrerequisites();
+
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
+ zonesConfiguration.distributionZones();
+
+ DistributionZoneView zoneView = zones.value().get(0);
+
+ CountDownLatch in1 = new CountDownLatch(1);
+ CountDownLatch in2 = new CountDownLatch(1);
+ CountDownLatch out1 = new CountDownLatch(1);
+
+ distributionZoneManager.scheduleTimers(
+ zoneView,
+ Set.of("B"),
+ Set.of(),
+ 2,
+ (zoneId, revision) -> {
+ in1.countDown();
+
+ return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(() -> {
+ try {
+ out1.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ });
+ }
+ );
+
+ // Waiting for the first task to be run. We have to do that to be sure
that watch events,
+ // which we try to emulate, are handled sequentially.
+ in1.await();
+
+ distributionZoneManager.scheduleTimers(
+ zoneView,
+ Set.of("C"),
+ Set.of(),
+ 3,
+ (zoneId, revision) -> {
+ try {
+ in2.await();
+ } catch (InterruptedException e) {
+ fail();
+ }
+
+ return testSaveDataNodesOnScaleUp(zoneId, revision);
+ }
+ );
+
+ // Assert that second task was run and event about adding node "C"
with revision 3 was added
+ // to the topologyAugmentationMap of the zone.
+ assertTrue(
+ waitForCondition(
+ () ->
distributionZoneManager.zonesTimers().get(1).topologyAugmentationMap().containsKey(3L),
+ 1000
+ )
+ );
+
+ assertZoneScaleUpChangeTriggerKey(2, 1);
+
+ assertDataNodesForZone(1, Set.of("A", "B"));
+
+ // Second task is run and we await that data nodes will be changed
from ["A", "B"] to ["A", "B", "C"]
+ in2.countDown();
+
+ assertZoneScaleUpChangeTriggerKey(3, 1);
+
+ assertDataNodesForZone(1, Set.of("A", "B", "C"));
+
+ out1.countDown();
+ }
+
+
+
+ /**
+ * Creates a zone with the auto adjust scale up trigger equals to 0 and
the data nodes equals ["A"].
+ *
+ * @throws Exception when something goes wrong.
+ */
+ private void preparePrerequisites() throws Exception {
+ ClusterNode node1 = new ClusterNode("1", "A", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+
mockVaultZonesLogicalTopologyKey(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager.start();
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+ ).get();
+
+ assertDataNodesForZone(1,
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ assertZoneScaleUpChangeTriggerKey(1, 1);
+ }
+
+ private CompletableFuture<Void> testSaveDataNodesOnScaleUp(int zoneId,
long revision) {
+ return
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+ }
+
+ private void assertDataNodesForZone(int zoneId, @Nullable Set<String>
clusterNodes) throws InterruptedException {
+ assertTrue(waitForCondition(
+ () -> {
+ byte[] dataNodes =
keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
+
+ if (dataNodes == null) {
+ return clusterNodes == null;
+ }
+
+ Set<String> res = ByteUtils.fromBytes(dataNodes);
+
+ return res.equals(clusterNodes);
+ },
+ 2000
+ ));
+ }
+
+ private void assertNotEqualsDataNodesForZone(int zoneId, @Nullable
Set<String> clusterNodes) throws InterruptedException {
+ assertFalse(waitForCondition(
+ () -> {
+ byte[] dataNodes =
keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
+
+ if (dataNodes == null) {
+ return clusterNodes == null;
+ }
+
+ Set<String> res = ByteUtils.fromBytes(dataNodes);
+
+ return res.equals(clusterNodes);
+ },
+ 2000
+ ));
+ }
+
+ private void assertZoneScaleUpChangeTriggerKey(int revision, int zoneId)
throws InterruptedException {
+ assertTrue(
+ waitForCondition(
+ () ->
ByteUtils.bytesToLong(keyValueStorage.get(zoneScaleUpChangeTriggerKey(zoneId).bytes()).value())
== revision,
+ 2000
+ )
+ );
+ }
+
+ private void assertZonesChangeTriggerKey(int revision, int zoneId) throws
InterruptedException {
+ assertTrue(
+ waitForCondition(
+ () ->
ByteUtils.bytesToLong(keyValueStorage.get(zonesChangeTriggerKey(zoneId).bytes()).value())
== revision, 1000
+ )
+ );
+ }
+
+ private void mockVaultAppliedRevision(long revision) {
+ when(metaStorageManager.appliedRevision()).thenReturn(revision);
+ }
+
+ private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+ byte[] newLogicalTopology = toBytes(nodes);
+
+ Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(),
newLogicalTopology, rev, 1);
+
+ EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+ WatchEvent evt = new WatchEvent(entryEvent);
+
+ watchListener.onUpdate(evt);
+ }
+
+ private LogicalTopologySnapshot mockCmgLocalNodes(long version,
Set<ClusterNode> clusterNodes) {
+ LogicalTopologySnapshot logicalTopologySnapshot = new
LogicalTopologySnapshot(version, clusterNodes);
+
+
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+ return logicalTopologySnapshot;
+ }
+
+ private void assertLogicalTopology(@Nullable Set<ClusterNode>
clusterNodes) throws InterruptedException {
+ byte[] nodes = clusterNodes == null
+ ? null
+ :
toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zonesLogicalTopologyKey().bytes()).value(),
nodes), 1000));
+ }
+
+ private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+ byte[] newLogicalTopology = toBytes(nodes);
+
+ when(vaultMgr.get(zonesLogicalTopologyKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index 0b0a19086d..7eada92853 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -103,7 +103,8 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
tablesConfiguration,
null,
null,
- null
+ null,
+ "node"
);
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index e36870e966..80dc99b4a8 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -22,9 +22,11 @@ import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTR
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
@@ -44,11 +46,15 @@ import static org.mockito.Mockito.when;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.configuration.NamedListView;
@@ -66,8 +72,11 @@ import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
import org.apache.ignite.internal.metastorage.dsl.If;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -75,6 +84,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -85,6 +95,7 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -154,7 +165,8 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
tablesConfiguration,
metaStorageManager,
logicalTopologyService,
- vaultMgr
+ vaultMgr,
+ "node"
);
clusterCfgMgr.start();
@@ -223,7 +235,49 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
return res;
}
- ).when(metaStorageService).run(any());
+ ).when(metaStorageService).run(any(WriteCommand.class));
+
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReadCommand command() {
+ return (ReadCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onRead(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(ReadCommand.class));
MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
@@ -234,6 +288,28 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
return metaStorageService.run(multiInvokeCommand).thenApply(bi ->
new StatementResult(((StatementResultInfo) bi).result()));
}).when(metaStorageManager).invoke(any());
+
+ lenient().doAnswer(invocationClose -> {
+ Set<ByteArray> keysSet = invocationClose.getArgument(0);
+
+ GetAllCommand getAllCommand = commandsFactory.getAllCommand().keys(
+
keysSet.stream().map(ByteArray::bytes).collect(Collectors.toList())
+ ).revision(0).build();
+
+ return metaStorageService.run(getAllCommand).thenApply(bi -> {
+ MultipleEntryResponse resp = (MultipleEntryResponse) bi;
+
+ Map<ByteArray, org.apache.ignite.internal.metastorage.Entry>
res = new HashMap<>();
+
+ for (SingleEntryResponse e : resp.entries()) {
+ ByteArray key = new ByteArray(e.key());
+
+ res.put(key, new EntryImpl(key.bytes(), e.value(),
e.revision(), e.updateCounter()));
+ }
+
+ return res;
+ });
+ }).when(metaStorageManager).getAll(any());
}
@AfterEach
@@ -271,6 +347,9 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+ nodes = Set.of("node1", "node2", "node3");
+
+ // Scale up just adds node to data nodes
checkDataNodesOfZone(DEFAULT_ZONE_ID, nodes);
//third event
@@ -279,22 +358,33 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
watchListenerOnUpdate(nodes, 4);
- verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+ verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+ nodes = Set.of("node1", "node2", "node3");
+ // Scale up wasn't triggered
checkDataNodesOfZone(DEFAULT_ZONE_ID, nodes);
}
+ private void assertDataNodesForZone(int zoneId, @Nullable Set<String>
clusterNodes) throws InterruptedException {
+ byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
+
+ assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(),
nodes), 1000));
+ }
+
@Test
void testStaleWatchEvent() {
mockVaultZonesLogicalTopologyKey(Set.of());
+ mockZones(mockZoneWithAutoAdjustScaleUp(100));
+
distributionZoneManager.start();
mockVaultAppliedRevision(1);
long revision = 100;
- keyValueStorage.put(zonesChangeTriggerKey().bytes(),
longToBytes(revision));
+ keyValueStorage.put(zoneScaleUpChangeTriggerKey(1).bytes(),
longToBytes(revision));
Set<String> nodes = Set.of("node1", "node2");
@@ -309,7 +399,7 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
void testStaleVaultRevisionOnZoneManagerStart() {
long revision = 100;
- keyValueStorage.put(zonesChangeTriggerKey().bytes(),
longToBytes(revision));
+ keyValueStorage.put(zonesChangeTriggerKey(0).bytes(),
longToBytes(revision));
Set<String> nodes = Set.of("node1", "node2");
@@ -466,7 +556,7 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
}
private DistributionZoneView mockDefaultZoneView() {
- DistributionZoneView defaultZone = mockZoneView(DEFAULT_ZONE_ID,
DEFAULT_ZONE_NAME, 100, Integer.MAX_VALUE,
+ DistributionZoneView defaultZone = mockZoneView(DEFAULT_ZONE_ID,
DEFAULT_ZONE_NAME, Integer.MAX_VALUE, 0,
Integer.MAX_VALUE);
DistributionZonesView zonesView = mock(DistributionZonesView.class);
@@ -477,6 +567,11 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
return defaultZone;
}
+ private DistributionZoneConfiguration mockZoneWithAutoAdjustScaleUp(int
scaleUp) {
+ return mockZoneConfiguration(1, ZONE_NAME_1, Integer.MAX_VALUE,
scaleUp, Integer.MAX_VALUE);
+ }
+
+
private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
byte[] newLogicalTopology = toBytes(nodes);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
new file mode 100644
index 0000000000..2e14cc044e
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test scenarios for the distribution zone schedulers.
+ */
+public class DistributionZonesSchedulersTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZonesSchedulersTest.class);
+
+ private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(
+ Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+ new NamedThreadFactory("test-dst-zones-scheduler", LOG),
+ new ThreadPoolExecutor.DiscardPolicy()
+ );
+
+ @AfterAll
+ public static void afterAll() {
+ shutdownAndAwaitTermination(executor, 2, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testScaleUpSchedule() throws InterruptedException {
+ ZoneState state = new ZoneState(executor);
+
+ testSchedule(state::rescheduleScaleUp);
+ }
+
+ @Test
+ void testScaleDownSchedule() throws InterruptedException {
+ ZoneState state = new ZoneState(executor);
+
+ testSchedule(state::rescheduleScaleDown);
+ }
+
+ private static void testSchedule(BiConsumer<Long, Runnable> fn) throws
InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ assertEquals(1L, latch.getCount());
+
+ fn.accept(0L, latch::countDown);
+
+ latch.await();
+
+ assertEquals(0L, latch.getCount());
+ }
+
+ @Test
+ void testScaleUpReScheduleNotStartedTask() {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testReScheduleNotStartedTask(state::rescheduleScaleUp);
+ }
+
+ @Test
+ void testScaleDownReScheduleNotStartedTask() {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testReScheduleNotStartedTask(state::rescheduleScaleDown);
+ }
+
+ private static void testReScheduleNotStartedTask(BiConsumer<Long,
Runnable> fn) {
+ Runnable runnable = mock(Runnable.class);
+
+ fn.accept(1L, runnable);
+
+ fn.accept(0L, runnable);
+
+ verify(runnable, after(1200).times(1)).run();
+ }
+
+ @Test
+ void testScaleUpReScheduleWhenTaskIsEnded() throws InterruptedException {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testReScheduleWhenTaskIsEnded(state::rescheduleScaleUp);
+ }
+
+ @Test
+ void testScaleDownReScheduleWhenTaskIsEnded() throws InterruptedException {
+ ZoneState state = new ZoneState(executor);
+
+ testReScheduleWhenTaskIsEnded(state::rescheduleScaleUp);
+ }
+
+ private static void testReScheduleWhenTaskIsEnded(BiConsumer<Long,
Runnable> fn) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ AtomicBoolean flag = new AtomicBoolean();
+
+ assertEquals(1L, latch.getCount());
+
+ fn.accept(0L, latch::countDown);
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+
+ fn.accept(0L, () -> {
+ flag.set(true);
+ });
+
+ assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500));
+ assertTrue(waitForCondition(flag::get, 1500));
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6d40dc0e95..1affcc2468 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -400,7 +400,8 @@ public class IgniteImpl implements Ignite {
tablesConfiguration,
metaStorageMgr,
logicalTopologyService,
- vaultMgr
+ vaultMgr,
+ name
);
volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
index 6a61aae43e..e339a97dd6 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java
@@ -108,7 +108,8 @@ public class DdlCommandHandlerExceptionHandlingTest extends
IgniteAbstractTest {
null,
null,
null,
- null
+ null,
+ "node"
);
commandHandler = new DdlCommandHandler(distributionZoneManager,
tableManager, indexManager, dataStorageManager);