This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6ada47270d IGNITE-18756 Awaiting for data nodes updating in
distribution zone manager (#1729)
6ada47270d is described below
commit 6ada47270d21795ba583d6be415bf91f5c64e319
Author: Sergey Uttsel <[email protected]>
AuthorDate: Mon Apr 17 11:28:09 2023 +0300
IGNITE-18756 Awaiting for data nodes updating in distribution zone manager
(#1729)
---
.../management/raft/TestClusterStateStorage.java | 7 +-
.../apache/ignite/internal/util/IgniteUtils.java | 12 +
.../distributionzones/DistributionZoneManager.java | 463 ++++++++++++++--
.../distributionzones/DistributionZonesUtil.java | 64 ++-
.../DistributionZoneWasRemovedException.java | 49 ++
.../DistributionZoneAwaitDataNodesTest.java | 582 +++++++++++++++++++++
...ibutionZoneManagerConfigurationChangesTest.java | 15 +-
...butionZoneManagerLogicalTopologyEventsTest.java | 21 +-
.../DistributionZoneManagerScaleUpTest.java | 33 +-
.../DistributionZoneManagerWatchListenerTest.java | 37 +-
.../ItRaftCommandLeftInLogUntilRestartTest.java | 3 +-
.../internal/table/distributed/TableManager.java | 4 +-
.../TableManagerDistributionZonesTest.java | 11 +-
13 files changed, 1198 insertions(+), 103 deletions(-)
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
index 736715a34a..6ab873c5a7 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
@@ -19,13 +19,13 @@ package org.apache.ignite.internal.cluster.management.raft;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -130,11 +130,6 @@ public class TestClusterStateStorage implements
ClusterStateStorage {
}
}
- private static boolean startsWith(byte[] key, byte[] prefix) {
- return key.length >= prefix.length
- && Arrays.equals(key, 0, prefix.length, prefix, 0,
prefix.length);
- }
-
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
List<byte[]> keys;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ea16c07854..e93e9908e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1055,4 +1055,16 @@ public class IgniteUtils {
}
});
}
+
+ /**
+ * Utility method to check if one byte array starts with a specified
sequence of bytes.
+ *
+ * @param key The array to check.
+ * @param prefix The prefix bytes to test for.
+ * @return {@code true} if the key starts with the bytes from the prefix.
+ */
+ public static boolean startsWith(byte[] key, byte[] prefix) {
+ return key.length >= prefix.length
+ && Arrays.equals(key, 0, prefix.length, prefix, 0,
prefix.length);
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9732870967..00cc8e6b92 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.distributionzones;
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
@@ -24,6 +26,8 @@ import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyConditionForZonesChanges;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerScaleUpScaleDownKeysCondition;
@@ -32,8 +36,10 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -45,10 +51,10 @@ import static
org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,15 +88,18 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneBindTableException;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
@@ -106,8 +115,11 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -138,6 +150,12 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Default number of zone partitions. */
public static final int DEFAULT_PARTITION_COUNT = 25;
+ /**
+ * Value for the distribution zones' timers which means that data nodes
changing for distribution zone
+ * will be started without waiting.
+ */
+ public static final int IMMEDIATE_TIMER_VALUE = 0;
+
/** Default infinite value for the distribution zones' timers. */
public static final int INFINITE_TIMER_VALUE = Integer.MAX_VALUE;
@@ -174,6 +192,15 @@ public class DistributionZoneManager implements
IgniteComponent {
*/
private final Map<Integer, ZoneState> zonesState;
+ /** The tracker for the last topology version which was observed by
distribution zone manager. */
+ private final PendingComparableValuesTracker<Long> topVerTracker;
+
+ /** The last meta storage revision on which scale up timer was started. */
+ private volatile long lastScaleUpRevision;
+
+ /** The last meta storage revision on which scale down timer was started.
*/
+ private volatile long lastScaleDownRevision;
+
/** Listener for a topology events. */
private final LogicalTopologyEventListener topologyEventListener = new
LogicalTopologyEventListener() {
@Override
@@ -198,8 +225,11 @@ public class DistributionZoneManager implements
IgniteComponent {
*/
private volatile Set<String> logicalTopology;
- /** Watch listener. Needed to unregister it on {@link
DistributionZoneManager#stop()}. */
- private final WatchListener watchListener;
+ /** Watch listener for logical topology keys. */
+ private final WatchListener topologyWatchListener;
+
+ /** Watch listener for data nodes keys. */
+ private final WatchListener dataNodesWatchListener;
/**
* Creates a new distribution zone manager.
@@ -224,17 +254,21 @@ public class DistributionZoneManager implements
IgniteComponent {
this.logicalTopologyService = logicalTopologyService;
this.vaultMgr = vaultMgr;
- this.watchListener = createMetastorageListener();
+ this.topologyWatchListener = createMetastorageTopologyListener();
+
+ this.dataNodesWatchListener = createMetastorageDataNodesListener();
zonesState = new ConcurrentHashMap<>();
- logicalTopology = Collections.emptySet();
+ logicalTopology = emptySet();
executor = new ScheduledThreadPoolExecutor(
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
new
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName,
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
new ThreadPoolExecutor.DiscardPolicy()
);
+
+ topVerTracker = new PendingComparableValuesTracker<>(0L);
}
@TestOnly
@@ -511,6 +545,148 @@ public class DistributionZoneManager implements
IgniteComponent {
}
}
+ /**
+ * The method for obtaining the data nodes of the specified zone.
+ * If {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} and
+ * {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} are immediate
then it waits that the data nodes
+ * are up-to-date for the passed topology version.
+ *
+ * <p>If the values of auto adjust scale up and auto adjust scale down are
zero, then on the cluster topology changes
+ * the data nodes for the zone should be updated immediately. Therefore,
this method must return the data nodes which is calculated
+ * based on the topology with passed or greater version. Since the date
nodes value is updated asynchronously, this method waits for
+ * the date nodes to be updated with new nodes in the topology if the
value of auto adjust scale up is 0, and also waits for
+ * the date nodes to be updated with nodes that have left the topology if
the value of auto adjust scale down is 0.
+ * After the zone manager has observed the logical topology change and the
data nodes value is updated according to cluster topology,
+ * then this method completes the returned future with the current value
of data nodes.
+ *
+ * <p>If the value of auto adjust scale up is greater than zero, then it
is not necessary to wait for the data nodes update triggered
+ * by new nodes in cluster topology. Similarly if the value of auto adjust
scale down is greater than zero, then it is not necessary to
+ * wait for the data nodes update triggered by new nodes that have left
the topology in cluster topology.
+ *
+ * <p>The returned future can be completed with {@link
DistributionZoneNotFoundException} if zone with the provided {@code zoneId}
+ * cannot be found or {@link DistributionZoneWasRemovedException} in case
when the distribution zone was removed during
+ * method execution.
+ *
+ * @param zoneId Zone id.
+ * @param topVer Topology version.
+ * @return The data nodes future which will be completed with data nodes
for the zoneId or with exception.
+ */
+ public CompletableFuture<Set<String>> topologyVersionedDataNodes(int
zoneId, long topVer) {
+ CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut =
awaitTopologyVersion(topVer)
+ .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+ return allOf(
+ timerValuesFut.thenCompose(timerValues ->
scaleUpAwaiting(zoneId, timerValues.get1())),
+ timerValuesFut.thenCompose(timerValues ->
scaleDownAwaiting(zoneId, timerValues.get2()))
+ ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ }
+
+ /**
+ * Waits for observing passed topology version or greater version in
{@link DistributionZoneManager#topologyWatchListener}.
+ *
+ * @param topVer Topology version.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+ return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+ }
+
+ /**
+ * Transforms {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * and {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to
boolean values.
+ * True if it equals to zero and false if it greater than zero. Zero means
that data nodes changing must be started immediately.
+ *
+ * <p>The returned future can be completed with {@link
DistributionZoneNotFoundException}
+ * in case when the distribution zone was removed.
+ *
+ * @param zoneId Zone id.
+ * @return Future with the boolean values for immediate auto adjust scale
up and immediate auto adjust scale down.
+ */
+ private CompletableFuture<IgniteBiTuple<Boolean, Boolean>>
getImmediateTimers(int zoneId) {
+ return inBusyLock(busyLock, () -> {
+ DistributionZoneConfiguration zoneCfg =
getZoneById(zonesConfiguration, zoneId);
+
+ return completedFuture(new IgniteBiTuple<>(
+ zoneCfg.dataNodesAutoAdjustScaleUp().value() ==
IMMEDIATE_TIMER_VALUE,
+ zoneCfg.dataNodesAutoAdjustScaleDown().value() ==
IMMEDIATE_TIMER_VALUE
+ ));
+ });
+ }
+
+ /**
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0
then waits for the zone manager processes
+ * the data nodes update triggered by started nodes with passed or greater
revision.
+ * Else does nothing.
+ *
+ * <p>The returned future can be completed with {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param immediateScaleUp True in case of immediate scale up.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> scaleUpAwaiting(int zoneId, boolean
immediateScaleUp) {
+ return inBusyLock(busyLock, () -> {
+ if (immediateScaleUp) {
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (zoneState != null) {
+ return
zoneState.scaleUpRevisionTracker().waitFor(lastScaleUpRevision);
+ } else {
+ return failedFuture(new
DistributionZoneWasRemovedException(zoneId));
+ }
+ } else {
+ return completedFuture(null);
+ }
+ });
+ }
+
+ /**
+ * If the {@link
DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0
then waits for the zone manager processes
+ * the data nodes update triggered by stopped nodes with passed or greater
revision.
+ * Else does nothing.
+ *
+ * <p>The returned future can be completed with {@link
DistributionZoneWasRemovedException}
+ * in case when the distribution zone was removed during method execution.
+ *
+ * @param zoneId Zone id.
+ * @param immediateScaleDown True in case of immediate scale down.
+ * @return Future for chaining.
+ */
+ private CompletableFuture<Void> scaleDownAwaiting(int zoneId, boolean
immediateScaleDown) {
+ return inBusyLock(busyLock, () -> {
+ if (immediateScaleDown) {
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (zoneState != null) {
+ return
zoneState.scaleDownRevisionTracker().waitFor(lastScaleDownRevision);
+ } else {
+ return failedFuture(new
DistributionZoneWasRemovedException(zoneId));
+ }
+ } else {
+ return completedFuture(null);
+ }
+ });
+ }
+
+ /**
+ * Returns the future with data nodes of the specified zone.
+ *
+ * @param zoneId Zone id.
+ * @return Future.
+ */
+ private CompletableFuture<Set<String>> getDataNodesFuture(int zoneId) {
+ return inBusyLock(busyLock, () -> {
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (zoneState != null) {
+ return completedFuture(zonesState.get(zoneId).nodes());
+ } else {
+ return failedFuture(new
DistributionZoneWasRemovedException(zoneId));
+ }
+ });
+ }
+
/** {@inheritDoc} */
@Override
public void start() {
@@ -540,7 +716,8 @@ public class DistributionZoneManager implements
IgniteComponent {
logicalTopologyService.addEventListener(topologyEventListener);
- metaStorageManager.registerExactWatch(zonesLogicalTopologyKey(),
watchListener);
+
metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(),
topologyWatchListener);
+ metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(),
dataNodesWatchListener);
initDataNodesFromVaultManager();
@@ -584,6 +761,12 @@ public class DistributionZoneManager implements
IgniteComponent {
zoneState.stopScaleUp();
}
+ //Only wait for a scale up revision if the auto adjust scale up
has a zero value.
+ //So if the value of the auto adjust scale up has become non-zero,
then need to complete all futures.
+ if (newScaleUp > 0) {
+ zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+ }
+
return completedFuture(null);
};
}
@@ -622,6 +805,12 @@ public class DistributionZoneManager implements
IgniteComponent {
zoneState.stopScaleDown();
}
+ //Only wait for a scale down revision if the auto adjust scale
down has a zero value.
+ //So if the value of the auto adjust scale down has become
non-zero, then need to complete all futures.
+ if (newScaleDown > 0) {
+
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+ }
+
return completedFuture(null);
};
}
@@ -637,7 +826,16 @@ public class DistributionZoneManager implements
IgniteComponent {
logicalTopologyService.removeEventListener(topologyEventListener);
- metaStorageManager.unregisterWatch(watchListener);
+ metaStorageManager.unregisterWatch(topologyWatchListener);
+ metaStorageManager.unregisterWatch(dataNodesWatchListener);
+
+ //Need to update trackers with max possible value to complete all
futures that are waiting for trackers.
+ topVerTracker.update(Long.MAX_VALUE);
+
+ zonesState.values().forEach(zoneState -> {
+ zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE);
+ zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE);
+ });
shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
}
@@ -664,7 +862,10 @@ public class DistributionZoneManager implements
IgniteComponent {
removeTriggerKeysAndDataNodes(zoneId, ctx.storageRevision());
- zonesState.remove(zoneId);
+ ZoneState zoneState = zonesState.remove(zoneId);
+
+ zoneState.scaleUpRevisionTracker.update(Long.MAX_VALUE);
+ zoneState.scaleDownRevisionTracker.update(Long.MAX_VALUE);
return completedFuture(null);
}
@@ -904,45 +1105,62 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- vaultMgr.get(zonesLogicalTopologyKey())
- .thenAccept(vaultEntry -> {
- if (!busyLock.enterBusy()) {
- throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
- }
+ long appliedRevision = metaStorageManager.appliedRevision();
- try {
- long appliedRevision =
metaStorageManager.appliedRevision();
+ lastScaleUpRevision = appliedRevision;
- if (vaultEntry != null && vaultEntry.value() !=
null) {
- logicalTopology =
fromBytes(vaultEntry.value());
+ lastScaleDownRevision = appliedRevision;
- // init keys and data nodes for default zone
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- DEFAULT_ZONE_ID,
- appliedRevision,
- logicalTopology
- );
+ VaultEntry topVerEntry =
vaultMgr.get(zonesLogicalTopologyVersionKey()).join();
-
zonesConfiguration.distributionZones().value().forEach(zone -> {
- int zoneId = zone.zoneId();
+ if (topVerEntry != null && topVerEntry.value() != null) {
+ topVerTracker.update(bytesToLong(topVerEntry.value()));
+ }
-
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- zoneId,
- appliedRevision,
- logicalTopology
- );
- });
- }
- } finally {
- busyLock.leaveBusy();
- }
- });
+ VaultEntry topologyEntry =
vaultMgr.get(zonesLogicalTopologyKey()).join();
+
+ if (topologyEntry != null && topologyEntry.value() != null) {
+ logicalTopology = fromBytes(topologyEntry.value());
+
+ // init keys and data nodes for default zone
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ DEFAULT_ZONE_ID,
+ appliedRevision,
+ logicalTopology
+ );
+
+ zonesConfiguration.distributionZones().value().forEach(zone ->
{
+ int zoneId = zone.zoneId();
+
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ zoneId,
+ appliedRevision,
+ logicalTopology
+ );
+ });
+ }
+
+ zonesState.values().forEach(zoneState -> {
+ zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+
+
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+
+ zoneState.nodes(logicalTopology);
+ });
+
+ assert topologyEntry == null || topologyEntry.value() == null ||
logicalTopology.equals(fromBytes(topologyEntry.value()))
+ : "Initial value of logical topology was changed after
initialization from the vault manager.";
} finally {
busyLock.leaveBusy();
}
}
- private WatchListener createMetastorageListener() {
+ /**
+ * Creates watch listener which listens logical topology and logical
topology version.
+ *
+ * @return Watch listener.
+ */
+ private WatchListener createMetastorageTopologyListener() {
return new WatchListener() {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
@@ -951,24 +1169,55 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- assert evt.single() : "Expected an event with one entry
but was an event with several entries with keys: "
+ assert evt.entryEvents().size() == 2 :
+ "Expected an event with logical topology and
logical topology version entries but was events with keys: "
+ evt.entryEvents().stream().map(entry ->
entry.newEntry() == null ? "null" : entry.newEntry().key())
.collect(toList());
- Entry newEntry = evt.entryEvent().newEntry();
+ long topVer = 0;
+
+ byte[] newLogicalTopologyBytes = null;
+
+ Set<String> newLogicalTopology = null;
+
+ long revision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
+
+ if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
+ topVer = bytesToLong(e.value());
- long revision = newEntry.revision();
+ revision = e.revision();
+ } else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
+ newLogicalTopologyBytes = e.value();
- byte[] newLogicalTopologyBytes = newEntry.value();
+ newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ }
+ }
+
+ assert newLogicalTopology != null : "The event doesn't
contain logical topology";
+ assert revision > 0 : "The event doesn't contain logical
topology version";
- Set<String> newLogicalTopology =
fromBytes(newLogicalTopologyBytes);
+ Set<String> newLogicalTopology0 = newLogicalTopology;
Set<String> removedNodes =
- logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toSet());
+ logicalTopology.stream().filter(node ->
!newLogicalTopology0.contains(node)).collect(toSet());
Set<String> addedNodes =
newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toSet());
+ //Firstly update lastScaleUpRevision and
lastScaleDownRevision then update topVerTracker to ensure thread-safety.
+ if (!addedNodes.isEmpty()) {
+ lastScaleUpRevision = revision;
+ }
+
+ if (!removedNodes.isEmpty()) {
+ lastScaleDownRevision = revision;
+ }
+
+ topVerTracker.update(topVer);
+
logicalTopology = newLogicalTopology;
NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
@@ -997,6 +1246,86 @@ public class DistributionZoneManager implements
IgniteComponent {
};
}
+ /**
+ * Creates watch listener which listens data nodes, scale up revision and
scale down revision.
+ *
+ * @return Watch listener.
+ */
+ private WatchListener createMetastorageDataNodesListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ int zoneId = 0;
+
+ Set<String> newDataNodes = null;
+
+ long scaleUpRevision = 0;
+
+ long scaleDownRevision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
+
+ if (startsWith(e.key(), zoneDataNodesKey().bytes())) {
+ zoneId = extractZoneId(e.key());
+
+ byte[] dataNodesBytes = e.value();
+
+ if (dataNodesBytes != null) {
+ newDataNodes =
DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
+ } else {
+ newDataNodes = emptySet();
+ }
+ } else if (startsWith(e.key(),
zoneScaleUpChangeTriggerKey().bytes())) {
+ if (e.value() != null) {
+ scaleUpRevision = bytesToLong(e.value());
+ }
+ } else if (startsWith(e.key(),
zoneScaleDownChangeTriggerKey().bytes())) {
+ if (e.value() != null) {
+ scaleDownRevision = bytesToLong(e.value());
+ }
+ }
+ }
+
+ ZoneState zoneState = zonesState.get(zoneId);
+
+ if (zoneState == null) {
+ //The zone has been dropped so no need to update
zoneState.
+ return completedFuture(null);
+ }
+
+ assert newDataNodes != null : "Data nodes was not
initialized.";
+
+ zoneState.nodes(newDataNodes);
+
+ //Associates scale up meta storage revision and data nodes.
+ if (scaleUpRevision > 0) {
+
zoneState.scaleUpRevisionTracker.update(scaleUpRevision);
+ }
+
+ //Associates scale down meta storage revision and data
nodes.
+ if (scaleDownRevision > 0) {
+
zoneState.scaleDownRevisionTracker.update(scaleDownRevision);
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process data nodes event", e);
+ }
+ };
+ }
+
/**
* Schedules scale up and scale down timers.
*
@@ -1292,6 +1621,15 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Executor for scheduling tasks for scale up and scale down
processes. */
private final ScheduledExecutorService executor;
+ /** Data nodes. */
+ private volatile Set<String> nodes;
+
+ /** The tracker for scale up meta storage revision of current data
nodes value. */
+ private final PendingComparableValuesTracker<Long>
scaleUpRevisionTracker;
+
+ /** The tracker for scale down meta storage revision of current data
nodes value. */
+ private final PendingComparableValuesTracker<Long>
scaleDownRevisionTracker;
+
/**
* Constructor.
*
@@ -1300,6 +1638,9 @@ public class DistributionZoneManager implements
IgniteComponent {
ZoneState(ScheduledExecutorService executor) {
this.executor = executor;
topologyAugmentationMap = new ConcurrentSkipListMap<>();
+ nodes = emptySet();
+ scaleUpRevisionTracker = new PendingComparableValuesTracker<>(0L);
+ scaleDownRevisionTracker = new
PendingComparableValuesTracker<>(0L);
}
/**
@@ -1456,6 +1797,42 @@ public class DistributionZoneManager implements
IgniteComponent {
.map(Map.Entry::getKey);
}
+ /**
+ * Get data nodes.
+ *
+ * @return Data nodes.
+ */
+ private Set<String> nodes() {
+ return nodes;
+ }
+
+ /**
+ * Set data nodes.
+ *
+ * @param nodes Data nodes.
+ */
+ private void nodes(Set<String> nodes) {
+ this.nodes = nodes;
+ }
+
+ /**
+ * The tracker for scale up meta storage revision of current data
nodes value.
+ *
+ * @return The tracker.
+ */
+ private PendingComparableValuesTracker<Long> scaleUpRevisionTracker() {
+ return scaleUpRevisionTracker;
+ }
+
+ /**
+ * The tracker for scale down meta storage revision of current data
nodes value.
+ *
+ * @return The tracker.
+ */
+ private PendingComparableValuesTracker<Long>
scaleDownRevisionTracker() {
+ return scaleDownRevisionTracker;
+ }
+
@TestOnly
synchronized ScheduledFuture<?> scaleUpTask() {
return scaleUpTask;
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 089f9c29a6..ef1a11a7d4 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -54,20 +54,28 @@ import org.jetbrains.annotations.Nullable;
* Util class for Distribution Zones flow.
*/
public class DistributionZonesUtil {
- /** Key prefix for zone's data nodes. */
+ /** Key prefix for zone's data nodes and trigger keys. */
private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX =
"distributionZone.dataNodes.";
+ /** Key prefix for zone's data nodes. */
+ private static final String DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX =
DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "value.";
+
/** Key prefix for zone's scale up change trigger key. */
- private static final String
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX =
"distributionZone.scaleUp.change.trigger.";
+ private static final String
DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX =
+ DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleUpChangeTrigger.";
/** Key prefix for zone's scale down change trigger key. */
- private static final String
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX =
"distributionZone.scaleDown.change.trigger.";
+ private static final String
DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX =
+ DISTRIBUTION_ZONE_DATA_NODES_PREFIX + "scaleDownChangeTrigger.";
+
+ /** Key prefix for zones' logical topology nodes and logical topology
version. */
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX =
"distributionZones.logicalTopology.";
/** Key prefix for zones' logical topology nodes. */
- private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
"distributionZones.logicalTopology";
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
/** Key prefix for zones' logical topology version. */
- private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION =
"distributionZones.logicalTopologyVersion";
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "version";
/** Key prefix, needed for processing the event about zone's update was
triggered only once. */
private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX =
"distributionZones.change.trigger.";
@@ -85,23 +93,36 @@ public class DistributionZonesUtil {
*/
private static final long INITIAL_TRIGGER_REVISION_VALUE = 0;
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
+ private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
+ new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+
/**
- * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
*
* @param zoneId Zone id.
* @return ByteArray representation.
*/
public static ByteArray zoneDataNodesKey(int zoneId) {
- return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
+ return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX +
zoneId);
+ }
+
+ /**
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
+ *
+ * @return ByteArray representation.
+ */
+ public static ByteArray zoneDataNodesKey() {
+ return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX);
}
/**
- * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX}.
*
* @return ByteArray representation.
*/
- public static ByteArray zoneDataNodesPrefix() {
- return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+ public static ByteArray zoneLogicalTopologyPrefix() {
+ return new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX);
}
/**
@@ -113,7 +134,7 @@ public class DistributionZonesUtil {
public static int extractZoneId(byte[] key) {
var strKey = new String(key, StandardCharsets.UTF_8);
- return
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_PREFIX.length()));
+ return
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX.length()));
}
/**
@@ -132,6 +153,13 @@ public class DistributionZonesUtil {
return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX
+ zoneId);
}
+ /**
+ * The key prefix needed for processing an event about zone's data node
propagation on scale up.
+ */
+ public static ByteArray zoneScaleUpChangeTriggerKey() {
+ return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX);
+ }
+
/**
* The key needed for processing an event about zone's data node
propagation on scale down.
* With this key we can be sure that event was triggered only once.
@@ -140,6 +168,13 @@ public class DistributionZonesUtil {
return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX + zoneId);
}
+ /**
+ * The key prefix needed for processing an event about zone's data node
propagation on scale down.
+ */
+ public static ByteArray zoneScaleDownChangeTriggerKey() {
+ return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX);
+ }
+
/**
* The key that represents logical topology nodes, needed for distribution
zones. It is needed to store them in the metastore
* to serialize data nodes changes triggered by topology changes and
changes of distribution zones configurations.
@@ -156,6 +191,13 @@ public class DistributionZonesUtil {
return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
}
+ /**
+ * The key prefix needed for processing an event about zone's data nodes.
+ */
+ static ByteArray zonesDataNodesPrefix() {
+ return DISTRIBUTION_ZONES_DATA_NODES_KEY;
+ }
+
/**
* Condition for updating {@link
DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} key.
* Update only if the revision of the event is newer than value in that
trigger key.
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
new file mode 100644
index 0000000000..5bb0063a37
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneWasRemovedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.exception;
+
+import static
org.apache.ignite.lang.ErrorGroups.DistributionZones.ZONE_NOT_FOUND_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Exception is thrown when appropriate distribution zone was removed.
+ */
+public class DistributionZoneWasRemovedException extends
IgniteInternalException {
+ /**
+ * The constructor.
+ *
+ * @param zoneId Zone id.
+ */
+ public DistributionZoneWasRemovedException(int zoneId) {
+ super(ZONE_NOT_FOUND_ERR, "Distribution zone is not found [zoneId=" +
zoneId + ']', null);
+ }
+
+ /**
+ * The constructor is used for creating an exception instance that is
thrown from a remote server.
+ *
+ * @param traceId Trace id.
+ * @param code Error code.
+ * @param message Error message.
+ * @param cause Cause exception.
+ */
+ public DistributionZoneWasRemovedException(UUID traceId, int code, String
message, Throwable cause) {
+ super(traceId, code, message, cause);
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
new file mode 100644
index 0000000000..1ad692df73
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case
when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown}
are immediate.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+ private MetaStorageManager metaStorageManager;
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private LogicalTopology logicalTopology;
+
+ private ClusterStateStorage clusterStateStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private ClusterManagementGroupManager cmgManager;
+
+ private VaultManager vaultManager;
+
+ @InjectConfiguration
+ private TablesConfiguration tablesConfiguration;
+
+ @InjectConfiguration
+ private DistributionZonesConfiguration zonesConfiguration;
+
+ private WatchListener topologyWatchListener;
+
+ private WatchListener dataNodesWatchListener;
+
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private final List<IgniteComponent> components = new ArrayList<>();
+
+ @BeforeEach
+ void setUp() throws Exception {
+ vaultManager = new VaultManager(new InMemoryVaultService());
+
+ assertThat(vaultManager.put(zonesLogicalTopologyKey(), null),
willCompleteSuccessfully());
+ assertThat(vaultManager.put(zonesLogicalTopologyVersionKey(),
longToBytes(0)), willCompleteSuccessfully());
+
+ components.add(vaultManager);
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+ metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
keyValueStorage);
+
+ components.add(metaStorageManager);
+
+ cmgManager = mock(ClusterManagementGroupManager.class);
+
+ clusterStateStorage = new TestClusterStateStorage();
+
+ components.add(clusterStateStorage);
+
+ logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ distributionZoneManager = new DistributionZoneManager(
+ zonesConfiguration,
+ tablesConfiguration,
+ metaStorageManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+ vaultManager,
+ "test"
+ );
+
+ mockCmgLocalNodes();
+
+ // Not adding 'distributionZoneManager' on purpose, it's started
manually.
+ components.forEach(IgniteComponent::start);
+
+ metaStorageManager.deployWatches();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ components.add(distributionZoneManager);
+
+ Collections.reverse(components);
+
+ IgniteUtils.closeAll(components.stream().map(c -> c::beforeNodeStop));
+ IgniteUtils.closeAll(components.stream().map(c -> c::stop));
+ }
+
+ /**
+ * This test invokes {@link
DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and
non-default zone id
+ * and different logical topology versions.
+ * Simulates new logical topology with new nodes and with removed nodes.
Check that data nodes futures are completed in right order.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19288")
+ @Test
+ void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws
Exception {
+ startZoneManager(0);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone0")
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone1")
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ int zoneId0 = distributionZoneManager.getZoneId("zone0");
+ int zoneId1 = distributionZoneManager.getZoneId("zone1");
+
+ LOG.info("Topology with added nodes.");
+
+ CompletableFuture<Set<String>> dataNodesUpFut0 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+ CompletableFuture<Set<String>> dataNodesUpFut1 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+ CompletableFuture<Set<String>> dataNodesUpFut2 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
+ CompletableFuture<Set<String>> dataNodesUpFut3 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 11);
+ CompletableFuture<Set<String>> dataNodesUpFut4 =
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 1);
+ CompletableFuture<Set<String>> dataNodesUpFut5 =
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 2);
+ CompletableFuture<Set<String>> dataNodesUpFut6 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
+ CompletableFuture<Set<String>> dataNodesUpFut7 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 2);
+
+ int topVer0 = 2;
+
+ Set<String> threeNodes = Set.of("node0", "node1", "node2");
+
+ setLogicalTopologyInMetaStorage(threeNodes, topVer0);
+
+ assertEquals(threeNodes, dataNodesUpFut0.get(3, SECONDS));
+ assertEquals(threeNodes, dataNodesUpFut1.get(3, SECONDS));
+ assertEquals(threeNodes, dataNodesUpFut2.get(3, SECONDS));
+
+ assertEquals(threeNodes, dataNodesUpFut4.get(3, SECONDS));
+ assertEquals(threeNodes, dataNodesUpFut5.get(3, SECONDS));
+ assertEquals(threeNodes, dataNodesUpFut6.get(3, SECONDS));
+ assertEquals(threeNodes, dataNodesUpFut7.get(3, SECONDS));
+ assertFalse(dataNodesUpFut3.isDone());
+
+
+ LOG.info("Topology with removed nodes.");
+
+ CompletableFuture<Set<String>> dataNodesDownFut0 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 4);
+ CompletableFuture<Set<String>> dataNodesDownFut1 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 4);
+ CompletableFuture<Set<String>> dataNodesDownFut2 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 5);
+ CompletableFuture<Set<String>> dataNodesDownFut3 =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 6);
+ CompletableFuture<Set<String>> dataNodesDownFut4 =
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 4);
+ CompletableFuture<Set<String>> dataNodesDownFut5 =
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 5);
+ CompletableFuture<Set<String>> dataNodesDownFut6 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 4);
+ CompletableFuture<Set<String>> dataNodesDownFut7 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 5);
+
+ int topVer1 = 5;
+
+ Set<String> twoNodes = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(twoNodes, topVer1);
+
+ assertEquals(twoNodes, dataNodesDownFut0.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut1.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut2.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut4.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut5.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut6.get(3, SECONDS));
+ assertEquals(twoNodes, dataNodesDownFut7.get(3, SECONDS));
+ assertFalse(dataNodesDownFut3.isDone());
+
+ int topVer2 = 20;
+
+ LOG.info("Topology with added and removed nodes.");
+
+ Set<String> dataNodes = Set.of("node0", "node2");
+
+ setLogicalTopologyInMetaStorage(dataNodes, topVer2);
+
+ assertEquals(dataNodes, dataNodesUpFut3.get(3, SECONDS));
+ assertEquals(dataNodes, dataNodesDownFut3.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
added nodes.
+ */
+ @Test
+ void testScaleUpAndThenScaleDown() throws Exception {
+ startZoneManager(0);
+
+ CompletableFuture<Set<String>> dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 5);
+
+ assertFalse(dataNodesFut.isDone());
+
+ long topVer = 100;
+
+ Set<String> dataNodes0 = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(dataNodes0, topVer);
+
+ assertFalse(dataNodesFut.isDone());
+
+ assertEquals(dataNodes0, dataNodesFut.get(3, SECONDS));
+
+ dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 106);
+
+ Set<String> dataNodes1 = Set.of("node0");
+
+ setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100);
+
+ assertFalse(dataNodesFut.isDone());
+
+ assertEquals(dataNodes1, dataNodesFut.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
added and removed nodes for the zone with
+ * dataNodesAutoAdjustScaleUp is immediate and
dataNodesAutoAdjustScaleDown is non-zero.
+ */
+ @Test
+ void testAwaitingScaleUpOnly() throws Exception {
+ startZoneManager(0);
+
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+ .get(3, SECONDS);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone1")
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ int zoneId = distributionZoneManager.getZoneId("zone1");
+
+ CompletableFuture<Set<String>> dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
+
+ Set<String> nodes0 = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(nodes0, 1);
+
+ assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+
+ dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(zoneId, 2);
+
+ assertFalse(dataNodesFut.isDone());
+
+ setLogicalTopologyInMetaStorage(Set.of("node0"), 2);
+
+ assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed on topology with
added and removed nodes for the zone with
+ * dataNodesAutoAdjustScaleUp is non-zero and dataNodesAutoAdjustScaleDown
is immediate. And checks that other zones
+ * non-zero timers doesn't affect.
+ */
+ @Test
+ void testAwaitingScaleDownOnly() throws Exception {
+ startZoneManager(0);
+
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+ .get(3, SECONDS);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone0")
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone1")
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone2")
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ int zoneId0 = distributionZoneManager.getZoneId("zone0");
+ int zoneId1 = distributionZoneManager.getZoneId("zone1");
+ int zoneId2 = distributionZoneManager.getZoneId("zone2");
+
+ CompletableFuture<Set<String>> dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
+
+ Set<String> nodes0 = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(nodes0, 1);
+
+ dataNodesFut.get(3, SECONDS);
+
+ CompletableFuture<Set<String>> dataNodesFut1Zone0 =
distributionZoneManager.topologyVersionedDataNodes(zoneId0, 2);
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 2);
+ CompletableFuture<Set<String>> dataNodesFut1Zone2 =
distributionZoneManager.topologyVersionedDataNodes(zoneId2, 2);
+
+ assertFalse(dataNodesFut1Zone0.isDone());
+ assertFalse(dataNodesFut1.isDone());
+ assertFalse(dataNodesFut1Zone2.isDone());
+
+ Set<String> nodes1 = Set.of("node0");
+
+ distributionZoneManager.alterZone("zone1", new
DistributionZoneConfigurationParameters.Builder("zone1")
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
+ .get(3, SECONDS);
+
+ System.out.println("setLogicalTopologyInMetaStorage_2");
+ setLogicalTopologyInMetaStorage(nodes1, 2);
+
+ assertEquals(nodes1, dataNodesFut1.get(3, SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.topologyVersionedDataNodes(zoneId1, 3);
+
+ Set<String> nodes2 = Set.of("node0", "node1");
+
+ assertFalse(dataNodesFut2.isDone());
+
+ setLogicalTopologyInMetaStorage(nodes2, 3);
+
+ assertEquals(nodes1, dataNodesFut2.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed immediately for the
zone with
+ * dataNodesAutoAdjustScaleUp is non-zero and dataNodesAutoAdjustScaleDown
is non-zero.
+ */
+ @Test
+ void testWithOutAwaiting() throws Exception {
+ startZoneManager(0);
+
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE).build())
+ .get(3, SECONDS);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone1")
+
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ int zoneId = distributionZoneManager.getZoneId("zone1");
+
+ CompletableFuture<Set<String>> dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
+
+ assertFalse(dataNodesFut.isDone());
+
+ Set<String> nodes0 = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(nodes0, 1);
+
+ assertEquals(emptySet(), dataNodesFut.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes futures are completed exceptionally if the
zone was removed while
+ * data nodes awaiting.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19255")
+ @Test
+ void testRemoveZoneWhileAwaitingDataNodes() throws Exception {
+ startZoneManager(0);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder("zone0")
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ int zoneId = distributionZoneManager.getZoneId("zone0");
+
+ CompletableFuture<Set<String>> dataNodesFut0 =
distributionZoneManager.topologyVersionedDataNodes(zoneId, 5);
+
+ setLogicalTopologyInMetaStorage(Set.of("node0", "node1"), 100);
+
+ assertFalse(dataNodesFut0.isDone());
+
+ assertEquals(Set.of("node0", "node1"), dataNodesFut0.get(3, SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.topologyVersionedDataNodes(zoneId, 106);
+
+ setLogicalTopologyInMetaStorage(Set.of("node0", "node2"), 200);
+
+ assertFalse(dataNodesFut1.isDone());
+
+ distributionZoneManager.dropZone("zone0").get();
+
+ assertThrowsWithCause(() -> dataNodesFut1.get(3, SECONDS),
DistributionZoneWasRemovedException.class);
+ }
+
+ /**
+ * Test checks that data nodes futures are completed with old data nodes
if dataNodesAutoAdjustScaleUp
+ * and dataNodesAutoAdjustScaleDown timer increased to non-zero value.
+ */
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19255")
+ @Test
+ void testScaleUpScaleDownAreChangedWhileAwaitingDataNodes() throws
Exception {
+ startZoneManager(0);
+
+ Set<String> nodes0 = Set.of("node0", "node1");
+
+ setLogicalTopologyInMetaStorage(nodes0, 1);
+
+ CompletableFuture<Set<String>> dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
+
+ assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+
+ Set<String> nodes1 = Set.of("node0", "node2");
+
+ dataNodesFut =
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
+
+ setLogicalTopologyInMetaStorage(nodes1, 2);
+
+ assertFalse(dataNodesFut.isDone());
+
+ //need to create new zone to fix assert invariant which is broken in
this test environment.
+ distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder("zone0")
+
.dataNodesAutoAdjustScaleUp(1000).dataNodesAutoAdjustScaleDown(1000).build())
+ .get(3, SECONDS);
+
+ assertFalse(dataNodesFut.isDone());
+
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(1000).dataNodesAutoAdjustScaleDown(1000).build())
+ .get(3, SECONDS);
+
+ assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+ }
+
+ /**
+ * Test checks that data nodes are initialized on zone manager start.
+ */
+ @Test
+ void testInitializedDataNodesOnZoneManagerStart() throws Exception {
+ Set<String> dataNodes = Set.of("node0", "node1");
+
+ Map<ByteArray, byte[]> valEntries = new HashMap<>();
+
+ valEntries.put(zonesLogicalTopologyKey(), toBytes(dataNodes));
+ valEntries.put(zonesLogicalTopologyVersionKey(), longToBytes(3));
+
+ vaultManager.putAll(valEntries);
+
+ Collection<LogicalNode> nodes = new ArrayList<>();
+
+ nodes.add(new LogicalNode(new ClusterNode("node0", "node0", new
NetworkAddress("local", 1))));
+ nodes.add(new LogicalNode(new ClusterNode("node1", "node1", new
NetworkAddress("local", 1))));
+
+ when(cmgManager.logicalTopology()).thenReturn(completedFuture(new
LogicalTopologySnapshot(3, nodes)));
+
+ startZoneManager(10);
+
+ assertEquals(dataNodes,
distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
+ .get(3, SECONDS));
+ }
+
+ private void startZoneManager(long revision) throws Exception {
+ vaultManager.put(new ByteArray("applied_revision"),
longToBytes(revision)).get();
+
+ distributionZoneManager.start();
+
+ distributionZoneManager.alterZone(
+ DEFAULT_ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
+ .get(3, SECONDS);
+ }
+
+ private void setLogicalTopologyInMetaStorage(Set<String> nodes, long
topVer) {
+ CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
+ Conditions.exists(zonesLogicalTopologyKey()),
+ List.of(
+ Operations.put(zonesLogicalTopologyKey(),
toBytes(nodes)),
+ Operations.put(zonesLogicalTopologyVersionKey(),
longToBytes(topVer))
+ ),
+ List.of(Operations.noop())
+ );
+
+ assertThat(invokeFuture, willBe(true));
+ }
+
+ private void mockCmgLocalNodes() {
+
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopology.getLogicalTopology()));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 8251652d32..de4f6f8f62 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
@@ -35,8 +36,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
@@ -108,13 +107,19 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
when(vaultMgr.get(zonesLogicalTopologyKey()))
.thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), toBytes(nodes))));
+ when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
LogicalTopologyService logicalTopologyService =
mock(LogicalTopologyService.class);
-
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, Set.of())));
+
+
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(0, Set.of())));
keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
metaStorageManager = StandaloneMetaStorageManager.create(vaultMgr,
keyValueStorage);
+ metaStorageManager.put(zonesLogicalTopologyVersionKey(),
longToBytes(0));
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
@@ -187,8 +192,6 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
assertZonesChangeTriggerKey(100, 1, keyValueStorage);
assertDataNodesForZone(1, null, keyValueStorage);
@@ -204,8 +207,6 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
distributionZoneManager.dropZone(ZONE_NAME).get();
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
-
assertDataNodesForZone(1, nodes, keyValueStorage);
}
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index fb71dd9769..88db11eb49 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -20,18 +20,18 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
import static
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.after;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Serializable;
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
@@ -140,6 +141,12 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
when(value.namedListKeys()).thenReturn(new ArrayList<>());
+ when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+ when(vaultMgr.get(zonesLogicalTopologyKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), toBytes(Set.of()))));
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
tablesConfiguration,
@@ -293,8 +300,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
distributionZoneManager1.start();
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
assertLogicalTopVer(1L);
assertLogicalTopology(clusterNodes, keyValueStorage);
@@ -312,8 +317,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
distributionZoneManager1.start();
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
-
assertLogicalTopVer(2L);
assertLogicalTopology(clusterNodes, keyValueStorage);
@@ -331,8 +334,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
distributionZoneManager1.start();
- verify(keyValueStorage, after(500).never()).invoke(any());
-
assertLogicalTopVer(2L);
assertLogicalTopology(null, keyValueStorage);
@@ -350,8 +351,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
distributionZoneManager1.start();
- verify(keyValueStorage, after(500).never()).invoke(any());
-
assertLogicalTopVer(3L);
assertLogicalTopology(null, keyValueStorage);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 241654ec71..eeb6dad3a0 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -23,9 +23,11 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZoneManag
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
@@ -47,6 +49,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -91,6 +94,7 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
@@ -126,7 +130,7 @@ public class DistributionZoneManagerScaleUpTest {
private MetaStorageManager metaStorageManager;
- private WatchListener watchListener;
+ private WatchListener topologyWatchListener;
private DistributionZonesConfiguration zonesConfiguration;
@@ -187,13 +191,23 @@ public class DistributionZoneManagerScaleUpTest {
mockVaultAppliedRevision(1);
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
doAnswer(invocation -> {
- watchListener = invocation.getArgument(1);
+ ByteArray key = invocation.getArgument(0);
+
+ WatchListener watchListener = invocation.getArgument(1);
+
+ if (Arrays.equals(key.bytes(),
zoneLogicalTopologyPrefix().bytes())) {
+ topologyWatchListener = watchListener;
+ }
return null;
- }).when(metaStorageManager).registerExactWatch(any(), any());
+ }).when(metaStorageManager).registerPrefixWatch(any(), any());
AtomicLong raftIndex = new AtomicLong();
@@ -1577,15 +1591,18 @@ public class DistributionZoneManagerScaleUpTest {
}
private void watchListenerOnUpdate(Set<String> nodes, long rev) {
- byte[] newLogicalTopology = toBytes(nodes);
+ byte[] newTopology = toBytes(nodes);
+ byte[] newTopVer = longToBytes(1L);
- Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(),
newLogicalTopology, rev, 1);
+ Entry topology = new EntryImpl(zonesLogicalTopologyKey().bytes(),
newTopology, rev, 1);
+ Entry topVer = new EntryImpl(zonesLogicalTopologyVersionKey().bytes(),
newTopVer, rev, 1);
- EntryEvent entryEvent = new EntryEvent(null, newEntry);
+ EntryEvent topologyEvent = new EntryEvent(null, topology);
+ EntryEvent topVerEvent = new EntryEvent(null, topVer);
- WatchEvent evt = new WatchEvent(entryEvent);
+ WatchEvent evt = new WatchEvent(List.of(topologyEvent, topVerEvent),
rev);
- watchListener.onUpdate(evt);
+ topologyWatchListener.onUpdate(evt);
}
private void mockCmgLocalNodes() {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index 72b264cb44..07bd6127dc 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -22,9 +22,11 @@ import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTR
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
import static
org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.mockMetaStorageListener;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -39,6 +41,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -71,6 +74,7 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -94,7 +98,7 @@ public class DistributionZoneManagerWatchListenerTest extends
IgniteAbstractTest
private MetaStorageManager metaStorageManager;
- private WatchListener watchListener;
+ private WatchListener topologyWatchListener;
private DistributionZonesConfiguration zonesConfiguration;
@@ -153,13 +157,23 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
mockVaultAppliedRevision(1);
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ when(vaultMgr.get(zonesLogicalTopologyVersionKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
when(vaultMgr.put(any(), any())).thenReturn(completedFuture(null));
doAnswer(invocation -> {
- watchListener = invocation.getArgument(1);
+ ByteArray key = invocation.getArgument(0);
+
+ WatchListener watchListener = invocation.getArgument(1);
+
+ if (Arrays.equals(key.bytes(),
zoneLogicalTopologyPrefix().bytes())) {
+ topologyWatchListener = watchListener;
+ }
return null;
- }).when(metaStorageManager).registerExactWatch(any(), any());
+ }).when(metaStorageManager).registerPrefixWatch(any(), any());
AtomicLong raftIndex = new AtomicLong();
@@ -193,7 +207,9 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
distributionZoneManager.alterZone(
DEFAULT_ZONE_NAME,
- new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME).dataNodesAutoAdjustScaleUp(0).build()
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(0)
+ .build()
).get();
//first event
@@ -322,15 +338,18 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
}
private void watchListenerOnUpdate(Set<String> nodes, long rev) {
- byte[] newLogicalTopology = toBytes(nodes);
+ byte[] newTopology = toBytes(nodes);
+ byte[] newTopVer = longToBytes(1L);
- Entry newEntry = new EntryImpl(zonesLogicalTopologyKey().bytes(),
newLogicalTopology, rev, 1);
+ Entry topology = new EntryImpl(zonesLogicalTopologyKey().bytes(),
newTopology, rev, 1);
+ Entry topVer = new EntryImpl(zonesLogicalTopologyVersionKey().bytes(),
newTopVer, rev, 1);
- EntryEvent entryEvent = new EntryEvent(null, newEntry);
+ EntryEvent topologyEvent = new EntryEvent(null, topology);
+ EntryEvent topVerEvent = new EntryEvent(null, topVer);
- WatchEvent evt = new WatchEvent(entryEvent);
+ WatchEvent evt = new WatchEvent(List.of(topologyEvent, topVerEvent),
rev);
- watchListener.onUpdate(evt);
+ topologyWatchListener.onUpdate(evt);
}
private void mockVaultAppliedRevision(long revision) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 5dc30b2fbb..21b4f669bc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -189,8 +189,7 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassInteg
tx.commit();
} finally {
- //TODO: IGNITE-18324 Nothing do in the rollback invocation when a
transaction is committed.
- //tx.rollback();
+ tx.rollback();
}
stopNodes();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e7cef89955..5ada992a94 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,7 +27,7 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -432,7 +432,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
distributionZonesConfiguration.distributionZones().any().replicas().listen(this::onUpdateReplicas);
// TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
- metaStorageMgr.registerPrefixWatch(zoneDataNodesPrefix(),
distributionZonesDataNodesListener);
+ metaStorageMgr.registerExactWatch(zoneDataNodesKey(),
distributionZonesDataNodesListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index bd95f013a7..4c0eb192d5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.when;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -144,14 +145,16 @@ public class TableManagerDistributionZonesTest extends
IgniteAbstractTest {
MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
doAnswer(invocation -> {
- WatchListener listener = invocation.getArgument(1);
+ ByteArray key = invocation.getArgument(0);
- if (watchListener == null) {
- watchListener = listener;
+ WatchListener watchListener = invocation.getArgument(1);
+
+ if (Arrays.equals(key.bytes(), zoneDataNodesKey().bytes())) {
+ this.watchListener = watchListener;
}
return null;
- }).when(metaStorageManager).registerPrefixWatch(any(), any());
+ }).when(metaStorageManager).registerExactWatch(any(), any());
tablesConfiguration = mock(TablesConfiguration.class);