This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b882c50079 IGNITE-18115 DistributionZoneManager populated with
MetaStorage listeners to logical topology events. (#1426)
b882c50079 is described below
commit b882c5007915ec31f770837ead108fc33bcf8424
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Jan 3 11:21:33 2023 +0300
IGNITE-18115 DistributionZoneManager populated with MetaStorage listeners
to logical topology events. (#1426)
---
modules/distribution-zones/build.gradle | 1 +
.../distributionzones/DistributionZoneManager.java | 307 +++++++++++---
.../distributionzones/DistributionZonesUtil.java | 4 +-
.../DistributionZoneNotFoundException.java | 2 +-
...ibutionZoneManagerConfigurationChangesTest.java | 121 +++---
...butionZoneManagerLogicalTopologyEventsTest.java | 11 +-
.../DistributionZoneManagerTest.java | 71 ++--
.../DistributionZoneManagerWatchListenerTest.java | 448 +++++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
9 files changed, 792 insertions(+), 175 deletions(-)
diff --git a/modules/distribution-zones/build.gradle
b/modules/distribution-zones/build.gradle
index c1e9ae2500..58a4cd5f42 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-metastorage-client')
implementation project(':ignite-cluster-management')
implementation project(':ignite-metastorage')
+ implementation project(':ignite-vault')
implementation project(':ignite-configuration')
implementation libs.jetbrains.annotations
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index d8f5117835..347d774350 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -18,33 +18,39 @@
package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
import static
org.apache.ignite.internal.metastorage.client.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.client.Conditions.value;
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.NamedListChange;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
@@ -56,14 +62,19 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.CompoundCondition;
import org.apache.ignite.internal.metastorage.client.Condition;
+import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.If;
import org.apache.ignite.internal.metastorage.client.Update;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
/**
* Distribution zones manager.
@@ -78,8 +89,8 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Meta Storage manager. */
private final MetaStorageManager metaStorageManager;
- /* Cluster Management manager. */
- private final ClusterManagementGroupManager cmgManager;
+ /** Vault manager. */
+ private final VaultManager vaultMgr;
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -108,24 +119,34 @@ public class DistributionZoneManager implements
IgniteComponent {
}
};
+ /** The logical topology on the last watch event.
+ * It's enough to mark this field by volatile because we don't update the
collection after it is assigned to the field.
+ */
+ private volatile Set<String> logicalTopology;
+
+ /** Watch listener id to unregister the watch listener on {@link
DistributionZoneManager#stop()}. */
+ private volatile Long watchListenerId;
+
/**
* Creates a new distribution zone manager.
*
* @param zonesConfiguration Distribution zones configuration.
* @param metaStorageManager Meta Storage manager.
- * @param cmgManager Cluster management group manager.
* @param logicalTopologyService Logical topology service.
+ * @param vaultMgr Vault manager.
*/
public DistributionZoneManager(
DistributionZonesConfiguration zonesConfiguration,
MetaStorageManager metaStorageManager,
- ClusterManagementGroupManager cmgManager,
- LogicalTopologyService logicalTopologyService
+ LogicalTopologyService logicalTopologyService,
+ VaultManager vaultMgr
) {
this.zonesConfiguration = zonesConfiguration;
this.metaStorageManager = metaStorageManager;
- this.cmgManager = cmgManager;
this.logicalTopologyService = logicalTopologyService;
+ this.vaultMgr = vaultMgr;
+
+ logicalTopology = Collections.emptySet();
}
/**
@@ -133,12 +154,16 @@ public class DistributionZoneManager implements
IgniteComponent {
*
* @param distributionZoneCfg Distribution zone configuration.
* @return Future representing pending completion of the operation.
+ * Future can be completed with {@link ConfigurationChangeException}
if a zone with the given name already exists
+ * or {@code distributionZoneCfg} is broken.
+ * @throws NullPointerException if {@code distributionZoneCfg} is {@code
null}.
+ * @throws NodeStoppingException If the node is stopping.
*/
public CompletableFuture<Void>
createZone(DistributionZoneConfigurationParameters distributionZoneCfg) {
Objects.requireNonNull(distributionZoneCfg, "Distribution zone
configuration is null.");
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
try {
@@ -171,8 +196,6 @@ public class DistributionZoneManager implements
IgniteComponent {
});
} catch (IllegalArgumentException e) {
throw new
DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
- } catch (Exception e) {
- throw new IgniteInternalException(UNEXPECTED_ERR,
distributionZoneCfg.name(), e);
}
}));
} finally {
@@ -186,13 +209,17 @@ public class DistributionZoneManager implements
IgniteComponent {
* @param name Distribution zone name.
* @param distributionZoneCfg Distribution zone configuration.
* @return Future representing pending completion of the operation.
+ * Future can be completed with {@link ConfigurationChangeException}
if a zone with the given name already exists or
+ * zone with name for renaming already exists or {@code
distributionZoneCfg} is broken.
+ * @throws NullPointerException if {@code name} or {@code
distributionZoneCfg} is {@code null}.
+ * @throws NodeStoppingException If the node is stopping.
*/
public CompletableFuture<Void> alterZone(String name,
DistributionZoneConfigurationParameters distributionZoneCfg) {
Objects.requireNonNull(name, "Distribution zone name is null.");
Objects.requireNonNull(distributionZoneCfg, "Distribution zone
configuration is null.");
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
try {
@@ -203,8 +230,6 @@ public class DistributionZoneManager implements
IgniteComponent {
renameChange = zonesListChange.rename(name,
distributionZoneCfg.name());
} catch (IllegalArgumentException e) {
throw new DistributionZoneRenameException(name,
distributionZoneCfg.name(), e);
- } catch (Exception e) {
- throw new IgniteInternalException(UNEXPECTED_ERR,
distributionZoneCfg.name(), e);
}
try {
@@ -231,8 +256,6 @@ public class DistributionZoneManager implements
IgniteComponent {
});
} catch (IllegalArgumentException e) {
throw new
DistributionZoneNotFoundException(distributionZoneCfg.name(), e);
- } catch (Exception e) {
- throw new IgniteInternalException(UNEXPECTED_ERR,
distributionZoneCfg.name(), e);
}
}));
} finally {
@@ -245,12 +268,15 @@ public class DistributionZoneManager implements
IgniteComponent {
*
* @param name Distribution zone name.
* @return Future representing pending completion of the operation.
+ * Future can be completed with {@link ConfigurationChangeException}
if a zone with the given name doesn't exist.
+ * @throws NullPointerException if {@code name} is {@code null}.
+ * @throws NodeStoppingException If the node is stopping.
*/
public CompletableFuture<Void> dropZone(String name) {
Objects.requireNonNull(name, "Distribution zone name is null.");
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
try {
@@ -271,11 +297,21 @@ public class DistributionZoneManager implements
IgniteComponent {
/** {@inheritDoc} */
@Override
public void start() {
- zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
- logicalTopologyService.addEventListener(topologyEventListener);
+ logicalTopologyService.addEventListener(topologyEventListener);
- initMetaStorageKeysOnStart();
+ registerMetaStorageWatchListener()
+ .thenAccept(ignore -> initDataNodesFromVaultManager())
+ .thenAccept(ignore -> initMetaStorageKeysOnStart());
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -288,6 +324,10 @@ public class DistributionZoneManager implements
IgniteComponent {
busyLock.block();
logicalTopologyService.removeEventListener(topologyEventListener);
+
+ if (watchListenerId != null) {
+ metaStorageManager.unregisterWatch(watchListenerId);
+ }
}
private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
@@ -328,30 +368,22 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- Set<ClusterNode> logicalTopology;
-
- //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18121
- try {
- logicalTopology = cmgManager.logicalTopology().get().nodes();
- } catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(e);
- }
-
- assert !logicalTopology.isEmpty() : "Logical topology cannot be
empty.";
-
// Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
// so we do not react on a stale events
CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
- Set<String> nodesConsistentIds =
logicalTopology.stream().map(ClusterNode::name).collect(Collectors.toSet());
+ // logicalTopology can be updated concurrently by the watch
listener.
+ Set<String> logicalTopology0 = logicalTopology;
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, nodesConsistentIds);
+ byte[] logicalTopologyBytes = toBytes(logicalTopology0);
+
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
metaStorageManager.invoke(iif).thenAccept(res -> {
if (res.getAsBoolean()) {
- LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, nodesConsistentIds);
+ LOG.debug("Update zones' dataNodes value [zoneId = {},
dataNodes = {}", zoneId, logicalTopology0);
} else {
LOG.debug("Failed to update zones' dataNodes value [zoneId
= {}]", zoneId);
}
@@ -483,47 +515,190 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- long topologyVersionFromCmg = snapshot.version();
+
metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry
-> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
- byte[] topVerFromMetastorage;
+ try {
+ long topologyVersionFromCmg = snapshot.version();
- try {
- topVerFromMetastorage =
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
- } catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(UNEXPECTED_ERR, e);
- }
+ byte[] topVerFromMetaStorage = topVerEntry.value();
- if (topVerFromMetastorage == null ||
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
- Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+ if (topVerFromMetaStorage == null ||
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+ Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
- Condition topologyVersionCondition = topVerFromMetastorage
== null ? notExists(zonesLogicalTopologyVersionKey()) :
-
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
-
- If iff = If.iif(topologyVersionCondition,
- updateLogicalTopologyAndVersion(topologyFromCmg,
topologyVersionFromCmg),
- ops().yield(false)
- );
+ Condition topologyVersionCondition =
topVerFromMetaStorage == null
+ ?
notExists(zonesLogicalTopologyVersionKey()) :
+
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
- metaStorageManager.invoke(iff).thenAccept(res -> {
- if (res.getAsBoolean()) {
- LOG.debug(
- "Distribution zones' logical topology and
version keys were initialised [topology = {}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- } else {
- LOG.debug(
- "Failed to initialize distribution zones'
logical topology "
- + "and version keys [topology =
{}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
+ If iff = If.iif(topologyVersionCondition,
+
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+ ops().yield(false)
);
+
+ metaStorageManager.invoke(iff).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug(
+ "Distribution zones' logical
topology and version keys were initialised "
+ + "[topology = {}, version
= {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ } else {
+ LOG.debug(
+ "Failed to initialize distribution
zones' logical topology "
+ + "and version keys
[topology = {}, version = {}]",
+
Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ }
+ });
}
- });
- }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+
} finally {
busyLock.leaveBusy();
}
});
}
+
+ /**
+ * Initialises data nodes of distribution zones in meta storage
+ * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+ */
+ private void initDataNodesFromVaultManager() {
+ vaultMgr.get(APPLIED_REV)
+ .thenApply(appliedRevision -> appliedRevision == null ? 0L :
bytesToLong(appliedRevision.value()))
+ .thenAccept(vaultAppliedRevision -> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException());
+ }
+
+ try {
+ vaultMgr.get(zonesLogicalTopologyKey())
+ .thenAccept(vaultEntry -> {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ if (vaultEntry != null &&
vaultEntry.value() != null) {
+ logicalTopology =
ByteUtils.fromBytes(vaultEntry.value());
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ int zoneId =
zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+
saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+ });
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
+
+ /**
+ * Registers {@link WatchListener} which updates data nodes of
distribution zones on logical topology changing event.
+ *
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<?> registerMetaStorageWatchListener() {
+ return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new
WatchListener() {
+ @Override
+ public boolean onUpdate(@NotNull WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ assert evt.single();
+
+ Entry newEntry = evt.entryEvent().newEntry();
+
+ Set<String> newLogicalTopology =
ByteUtils.fromBytes(newEntry.value());
+
+ List<String> removedNodes =
+ logicalTopology.stream().filter(node ->
!newLogicalTopology.contains(node)).collect(toList());
+
+ List<String> addedNodes =
+ newLogicalTopology.stream().filter(node ->
!logicalTopology.contains(node)).collect(toList());
+
+ logicalTopology = newLogicalTopology;
+
+
zonesConfiguration.distributionZones().value().namedListKeys()
+ .forEach(zoneName -> {
+ DistributionZoneConfiguration zoneCfg
= zonesConfiguration.distributionZones().get(zoneName);
+
+ int autoAdjust =
zoneCfg.dataNodesAutoAdjust().value();
+ int autoAdjustScaleDown =
zoneCfg.dataNodesAutoAdjustScaleDown().value();
+ int autoAdjustScaleUp =
zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+ Integer zoneId =
zoneCfg.zoneId().value();
+
+ if ((!addedNodes.isEmpty() ||
!removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18134 Create
scheduler with dataNodesAutoAdjust timer.
+ saveDataNodesToMetaStorage(
+ zoneId, newEntry.value(),
newEntry.revision()
+ );
+ } else {
+ if (!addedNodes.isEmpty() &&
autoAdjustScaleUp != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18121 Create
scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+
+ if (!removedNodes.isEmpty() &&
autoAdjustScaleDown != Integer.MAX_VALUE) {
+ //TODO: IGNITE-18132 Create
scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+ saveDataNodesToMetaStorage(
+ zoneId,
newEntry.value(), newEntry.revision()
+ );
+ }
+ }
+ });
+
+ return true;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(@NotNull Throwable e) {
+ LOG.warn("Unable to process logical topology event",
e);
+ }
+ })
+ .thenAccept(id -> watchListenerId = id);
+ }
+
+ /**
+ * Method updates data nodes value for the specified zone,
+ * also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+ *
+ * @param zoneId Unique id of a zone
+ * @param dataNodes Data nodes of a zone
+ * @param revision Revision of an event that has triggered this method.
+ */
+ private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long
revision) {
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+ var iif = If.iif(triggerKeyCondition(revision),
dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+ metaStorageManager.invoke(iif).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+ } else {
+ LOG.debug("Failed to delete zones' dataNodes key [zoneId =
{}]", zoneId);
+ }
+ });
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index a76902a1c5..c620b8d8f1 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -103,9 +103,9 @@ class DistributionZonesUtil {
* @param logicalTopology Logical topology.
* @return Update command for the meta storage.
*/
- static Update updateDataNodesAndTriggerKey(int zoneId, long revision,
Set<String> logicalTopology) {
+ static Update updateDataNodesAndTriggerKey(int zoneId, long revision,
byte[] logicalTopology) {
return ops(
- put(zoneDataNodesKey(zoneId),
ByteUtils.toBytes(logicalTopology)),
+ put(zoneDataNodesKey(zoneId), logicalTopology),
put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
).yield(true);
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
index a12853fbe9..e921e6a8a6 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
@@ -33,7 +33,7 @@ public class DistributionZoneNotFoundException extends
IgniteInternalException {
* @param zoneName Zone name.
*/
public DistributionZoneNotFoundException(String zoneName) {
- this("Distribution zone is not found [zoneName=" + zoneName + ']',
null);
+ this(zoneName, null);
}
/**
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index e8325efc94..df97816310 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -21,8 +21,12 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
@@ -40,8 +44,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -61,8 +63,10 @@ import
org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -77,8 +81,7 @@ public class DistributionZoneManagerConfigurationChangesTest
extends IgniteAbstr
private static final String NEW_ZONE_NAME = "zone2";
- @Mock
- private ClusterManagementGroupManager cmgManager;
+ private static final Set<String> nodes = Set.of("name1");
private DistributionZoneManager distributionZoneManager;
@@ -89,6 +92,8 @@ public class DistributionZoneManagerConfigurationChangesTest
extends IgniteAbstr
@Mock
private LogicalTopologyServiceImpl logicalTopologyService;
+ private VaultManager vaultMgr;
+
@BeforeEach
public void setUp() {
clusterCfgMgr = new ConfigurationManager(
@@ -104,15 +109,19 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
- cmgManager = mock(ClusterManagementGroupManager.class);
+ when(metaStorageManager.registerWatch(any(ByteArray.class),
any())).thenReturn(completedFuture(null));
logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
+ vaultMgr = mock(VaultManager.class);
+
+ when(vaultMgr.get(any())).thenReturn(completedFuture(null));
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
metaStorageManager,
- cmgManager,
- logicalTopologyService
+ logicalTopologyService,
+ vaultMgr
);
clusterCfgMgr.start();
@@ -121,6 +130,8 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, Set.of())));
+ mockVaultZonesLogicalTopologyKey(nodes);
+
distributionZoneManager.start();
AtomicLong raftIndex = new AtomicLong();
@@ -196,34 +207,23 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
@Test
void testDataNodesPropagationAfterZoneCreation() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
+ assertDataNodesForZone(1, null);
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
assertZonesChangeTriggerKey(1);
}
@Test
void testTriggerKeyPropagationAfterZoneUpdate() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- LogicalTopologySnapshot logicalTopologySnapshot =
mockCmgLocalNodes(clusterNodes);
+
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
assertZonesChangeTriggerKey(1);
- var clusterNodes2 = Set.of(
- new ClusterNode("1", "name1", null),
- new ClusterNode("2", "name2", null)
- );
-
- when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes2);
-
distributionZoneManager.alterZone(
ZONE_NAME,
new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
@@ -231,29 +231,23 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
assertZonesChangeTriggerKey(2);
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
}
@Test
void testZoneDeleteRemovesMetaStorageKey() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
-
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+ distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
- distributionZoneManager.dropZone(ZONE_NAME);
+ distributionZoneManager.dropZone(ZONE_NAME).get();
assertTrue(waitForCondition(() ->
keyValueStorage.get(zoneDataNodesKey(1).bytes()).value() == null, 5000));
}
@Test
void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
+
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
@@ -264,9 +258,7 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
@Test
void testSeveralZoneUpdatesUpdatesTriggerKey() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
+
assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
@@ -285,11 +277,7 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
@Test
void testDataNodesNotPropagatedAfterZoneCreation() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
-
- keyValueStorage.put(zonesChangeTriggerKey().bytes(),
ByteUtils.longToBytes(100));
+ keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
@@ -302,22 +290,11 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
@Test
void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- LogicalTopologySnapshot logicalTopologySnapshot =
mockCmgLocalNodes(clusterNodes);
-
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, clusterNodes);
-
- var clusterNodes2 = Set.of(
- new ClusterNode("1", "name1", null),
- new ClusterNode("2", "name2", null)
- );
-
- when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes2);
+ assertDataNodesForZone(1, nodes);
- keyValueStorage.put(zonesChangeTriggerKey().bytes(),
ByteUtils.longToBytes(100));
+ keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
distributionZoneManager.alterZone(
ZONE_NAME,
@@ -328,44 +305,36 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
assertZonesChangeTriggerKey(100);
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
}
@Test
void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
- Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
-
- mockCmgLocalNodes(clusterNodes);
-
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+ distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
- keyValueStorage.put(zonesChangeTriggerKey().bytes(),
ByteUtils.longToBytes(100));
+ keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
- distributionZoneManager.dropZone(ZONE_NAME);
+ distributionZoneManager.dropZone(ZONE_NAME).get();
verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
- assertDataNodesForZone(1, clusterNodes);
+ assertDataNodesForZone(1, nodes);
}
- private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode>
clusterNodes) {
- LogicalTopologySnapshot logicalTopologySnapshot =
mock(LogicalTopologySnapshot.class);
-
-
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
-
- when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+ private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+ byte[] newLogicalTopology = toBytes(nodes);
- return logicalTopologySnapshot;
+ when(vaultMgr.get(zonesLogicalTopologyKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
}
- private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode>
clusterNodes) throws InterruptedException {
- byte[] nodes = clusterNodes == null
- ? null
- :
ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+ private void assertDataNodesForZone(int zoneId, @Nullable Set<String>
clusterNodes) throws InterruptedException {
+ byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
- assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(),
nodes), 1000));
+ assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(),
nodes),
+ 1000));
}
private void assertZonesChangeTriggerKey(int revision) throws
InterruptedException {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index cc7272217a..e479dea073 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -118,11 +119,17 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(topology, cmgManager);
+ VaultManager vaultMgr = mock(VaultManager.class);
+
+ when(vaultMgr.get(any())).thenReturn(completedFuture(null));
+
+ when(metaStorageManager.registerWatch(any(ByteArray.class),
any())).then(invocation -> completedFuture(null));
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
metaStorageManager,
- cmgManager,
- logicalTopologyService
+ logicalTopologyService,
+ vaultMgr
);
clusterCfgMgr.start();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index 9db029085d..6e5e94f715 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -26,12 +26,13 @@ import static org.mockito.Mockito.mock;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.ConfigurationChangeException;
-import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneAlreadyExistsException;
@@ -39,6 +40,7 @@ import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneRenameException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -71,8 +73,8 @@ class DistributionZoneManagerTest extends IgniteAbstractTest {
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
mock(MetaStorageManager.class),
- mock(ClusterManagementGroupManager.class),
- mock(LogicalTopologyServiceImpl.class)
+ mock(LogicalTopologyServiceImpl.class),
+ mock(VaultManager.class)
);
}
@@ -156,10 +158,14 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
).get(5, TimeUnit.SECONDS);
+ CompletableFuture<Void> fut;
+
+ fut = distributionZoneManager.createZone(
+ new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+ );
+
try {
- distributionZoneManager.createZone(
- new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
- ).get(5, TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -172,8 +178,10 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testDropZoneIfNotExists() {
Exception e = null;
+ CompletableFuture<Void> fut =
distributionZoneManager.dropZone(ZONE_NAME);
+
try {
- distributionZoneManager.dropZone(ZONE_NAME).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -294,9 +302,12 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testAlterZoneRename1() {
Exception e = null;
+ CompletableFuture<Void> fut = distributionZoneManager
+ .alterZone(ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
+ .dataNodesAutoAdjust(100).build());
+
try {
- distributionZoneManager.alterZone(ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
- .dataNodesAutoAdjust(100).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -315,9 +326,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
.dataNodesAutoAdjust(100).build()).get(5, TimeUnit.SECONDS);
+ CompletableFuture<Void> fut =
distributionZoneManager.alterZone(ZONE_NAME, new Builder(NEW_ZONE_NAME)
+ .dataNodesAutoAdjust(100).build());
+
try {
- distributionZoneManager.alterZone(ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(NEW_ZONE_NAME)
- .dataNodesAutoAdjust(100).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -330,9 +343,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testAlterZoneIfExists() {
Exception e = null;
+ CompletableFuture<Void> fut =
distributionZoneManager.alterZone(ZONE_NAME, new Builder(ZONE_NAME)
+ .dataNodesAutoAdjust(100).build());
+
try {
- distributionZoneManager.alterZone(ZONE_NAME, new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
- .dataNodesAutoAdjust(100).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -345,9 +360,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testCreateZoneWithWrongAutoAdjust() {
Exception e = null;
+ CompletableFuture<Void> fut = distributionZoneManager.createZone(new
Builder(ZONE_NAME)
+ .dataNodesAutoAdjust(-10).build());
+
try {
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
- .dataNodesAutoAdjust(-10).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -360,9 +377,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testCreateZoneWithWrongSeparatedAutoAdjust1() {
Exception e = null;
+ CompletableFuture<Void> fut = distributionZoneManager.createZone(new
Builder(ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(-100).dataNodesAutoAdjustScaleDown(1).build());
+
try {
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-
.dataNodesAutoAdjustScaleUp(-100).dataNodesAutoAdjustScaleDown(1).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -375,9 +394,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
public void testCreateZoneWithWrongSeparatedAutoAdjust2() {
Exception e = null;
+ CompletableFuture<Void> fut = distributionZoneManager.createZone(new
Builder(ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(1).dataNodesAutoAdjustScaleDown(-100).build());
+
try {
- distributionZoneManager.createZone(new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-
.dataNodesAutoAdjustScaleUp(1).dataNodesAutoAdjustScaleDown(-100).build()).get(5,
TimeUnit.SECONDS);
+ fut.get(5, TimeUnit.SECONDS);
} catch (Exception e0) {
e = e0;
}
@@ -391,7 +412,7 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
Exception e = null;
try {
- distributionZoneManager.createZone(null).get(5, TimeUnit.SECONDS);
+ distributionZoneManager.createZone(null);
} catch (Exception e0) {
e = e0;
}
@@ -406,8 +427,7 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
Exception e = null;
try {
- distributionZoneManager.alterZone(null, new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build())
- .get(5, TimeUnit.SECONDS);
+ distributionZoneManager.alterZone(null, new
Builder(ZONE_NAME).build());
} catch (Exception e0) {
e = e0;
}
@@ -422,13 +442,11 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
Exception e = null;
try {
- distributionZoneManager.alterZone(ZONE_NAME, null)
- .get(5, TimeUnit.SECONDS);
+ distributionZoneManager.alterZone(ZONE_NAME, null);
} catch (Exception e0) {
e = e0;
}
- assertTrue(e != null);
assertTrue(e instanceof NullPointerException, e.toString());
assertEquals("Distribution zone configuration is null.",
e.getMessage(), e.toString());
}
@@ -438,8 +456,7 @@ class DistributionZoneManagerTest extends
IgniteAbstractTest {
Exception e = null;
try {
- distributionZoneManager.dropZone(null)
- .get(5, TimeUnit.SECONDS);
+ distributionZoneManager.dropZone(null);
} catch (Exception e0) {
e = e0;
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
new file mode 100644
index 0000000000..3e518b6ced
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import
org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import
org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that
changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends
IgniteAbstractTest {
+ private static final String ZONE_NAME_1 = "zone1";
+
+ private VaultManager vaultMgr;
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private WatchListener watchListener;
+
+ private DistributionZonesConfiguration zonesConfiguration;
+
+ private MetaStorageManager metaStorageManager;
+
+ @BeforeEach
+ public void setUp() {
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Map.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+ metaStorageManager = mock(MetaStorageManager.class);
+
+ LogicalTopologyServiceImpl logicalTopologyService =
mock(LogicalTopologyServiceImpl.class);
+
+ LogicalTopologySnapshot topologySnapshot =
mock(LogicalTopologySnapshot.class);
+
+ when(topologySnapshot.version()).thenReturn(1L);
+ when(topologySnapshot.nodes()).thenReturn(Collections.emptySet());
+
+
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(topologySnapshot));
+
+ vaultMgr = mock(VaultManager.class);
+
+ distributionZoneManager = new DistributionZoneManager(
+ zonesConfiguration,
+ metaStorageManager,
+ logicalTopologyService,
+ vaultMgr
+ );
+
+ clusterCfgMgr.start();
+
+ mockVaultAppliedRevision(1);
+
+
when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ when(metaStorageManager.registerWatch(any(ByteArray.class),
any())).then(invocation -> {
+ watchListener = invocation.getArgument(1);
+
+ return completedFuture(null);
+ });
+
+ mockEmptyZonesList();
+
+ AtomicLong raftIndex = new AtomicLong();
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage);
+
+ RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+ // Delegate directly to listener.
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WriteCommand command() {
+ return (WriteCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onWrite(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any());
+
+ MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+
+ lenient().doAnswer(invocationClose -> {
+ If iif = invocationClose.getArgument(0);
+
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif,
commandsFactory)).build();
+
+ return metaStorageService.run(multiInvokeCommand).thenApply(bi ->
new StatementResult(((StatementResultInfo) bi).result()));
+ }).when(metaStorageManager).invoke(any());
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ vaultMgr.stop();
+
+ distributionZoneManager.stop();
+
+ clusterCfgMgr.stop();
+
+ keyValueStorage.close();
+ }
+
+ @Test
+ void testDataNodesUpdatedOnWatchListenerEvent() {
+ mockVaultZonesLogicalTopologyKey(Set.of());
+
+ distributionZoneManager.start();
+
+ //TODO: Add second distribution zone, when
distributionZones.change.trigger per zone will be created.
+ mockZones(mockZoneWithAutoAdjust());
+
+ //first event
+
+ Set<String> nodes = Set.of("node1", "node2");
+
+ watchListenerOnUpdate(nodes, 1);
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ checkDataNodesOfZone(1, nodes);
+
+ //second event
+
+ nodes = Set.of("node1", "node3");
+
+ watchListenerOnUpdate(nodes, 2);
+
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+
+ checkDataNodesOfZone(1, nodes);
+
+ //third event
+
+ nodes = Collections.emptySet();
+
+ watchListenerOnUpdate(nodes, 3);
+
+ verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+ checkDataNodesOfZone(1, nodes);
+ }
+
+ private void checkDataNodesOfZone(int zoneId, Set<String> nodes) {
+ Entry entry = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes());
+
+ if (nodes == null) {
+ assertNull(entry.value());
+ } else {
+ Set<String> newDataNodes = fromBytes(entry.value());
+
+ assertTrue(newDataNodes.containsAll(nodes));
+ assertEquals(nodes.size(), newDataNodes.size());
+ }
+ }
+
+ @Test
+ void testStaleWatchEvent() {
+ mockVaultZonesLogicalTopologyKey(Set.of());
+
+ distributionZoneManager.start();
+
+ mockZones(mockZoneWithAutoAdjust());
+
+ mockVaultAppliedRevision(1);
+
+ long revision = 100;
+
+ keyValueStorage.put(zonesChangeTriggerKey().bytes(),
longToBytes(revision));
+
+ Set<String> nodes = Set.of("node1", "node2");
+
+ watchListenerOnUpdate(nodes, revision);
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ checkDataNodesOfZone(1, null);
+ }
+
+ @Test
+ void testStaleVaultRevisionOnZoneManagerStart() {
+ mockZones(mockZoneWithAutoAdjust());
+
+ long revision = 100;
+
+ keyValueStorage.put(zonesChangeTriggerKey().bytes(),
longToBytes(revision));
+
+ Set<String> nodes = Set.of("node1", "node2");
+
+ mockVaultZonesLogicalTopologyKey(nodes);
+
+ mockVaultAppliedRevision(revision);
+
+ distributionZoneManager.start();
+
+ verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+ checkDataNodesOfZone(1, null);
+ }
+
+ @Test
+ void testDataNodesUpdatedOnZoneManagerStart() {
+ mockZones(mockZoneWithAutoAdjust());
+
+ mockVaultAppliedRevision(2);
+
+ Set<String> nodes = Set.of("node1", "node2");
+
+ mockVaultZonesLogicalTopologyKey(nodes);
+
+ distributionZoneManager.start();
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ checkDataNodesOfZone(1, nodes);
+ }
+
+ @Test
+ void testLogicalTopologyIsNullOnZoneManagerStart1() {
+ mockZones(mockZoneWithAutoAdjust());
+
+ mockVaultAppliedRevision(2);
+
+ when(vaultMgr.get(zonesLogicalTopologyKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), null)));
+
+ distributionZoneManager.start();
+
+ verify(keyValueStorage, after(500).never()).invoke(any());
+
+ Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+ assertNull(entry.value());
+ }
+
+ private void mockEmptyZonesList() {
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+ mock(NamedConfigurationTree.class);
+
when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+ NamedListView<DistributionZoneView> namedListView =
mock(NamedListView.class);
+ when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+
when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+ }
+
+ private void mockZones(DistributionZoneConfiguration... zones) {
+ List<String> names = new ArrayList<>();
+
+ NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+ mock(NamedConfigurationTree.class);
+
when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+ NamedListView<DistributionZoneView> namedListView =
mock(NamedListView.class);
+ when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+ for (DistributionZoneConfiguration zone : zones) {
+ names.add(zone.name().value());
+
+
when(namedConfigurationTree.get(zone.name().value())).thenReturn(zone);
+ }
+
+ when(namedListView.namedListKeys()).thenReturn(names);
+ }
+
+ private DistributionZoneConfiguration mockZone(
+ Integer zoneId,
+ String name,
+ Integer dataNodesAutoAdjustTime,
+ Integer dataNodesAutoAdjustScaleUpTime,
+ Integer dataNodesAutoAdjustScaleDownTime
+ ) {
+ DistributionZoneConfiguration distributionZoneConfiguration =
mock(DistributionZoneConfiguration.class);
+
+ ConfigurationValue<String> nameValue = mock(ConfigurationValue.class);
+ when(distributionZoneConfiguration.name()).thenReturn(nameValue);
+ when(nameValue.value()).thenReturn(name);
+
+ ConfigurationValue<Integer> zoneIdValue =
mock(ConfigurationValue.class);
+ when(distributionZoneConfiguration.zoneId()).thenReturn(zoneIdValue);
+ when(zoneIdValue.value()).thenReturn(zoneId);
+
+ ConfigurationValue<Integer> dataNodesAutoAdjust =
mock(ConfigurationValue.class);
+
when(distributionZoneConfiguration.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust);
+ when(dataNodesAutoAdjust.value()).thenReturn(dataNodesAutoAdjustTime);
+
+ ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp =
mock(ConfigurationValue.class);
+
when(distributionZoneConfiguration.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp);
+
when(dataNodesAutoAdjustScaleUp.value()).thenReturn(dataNodesAutoAdjustScaleUpTime);
+
+ ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown =
mock(ConfigurationValue.class);
+
when(distributionZoneConfiguration.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown);
+
when(dataNodesAutoAdjustScaleDown.value()).thenReturn(dataNodesAutoAdjustScaleDownTime);
+
+ return distributionZoneConfiguration;
+ }
+
+ private DistributionZoneConfiguration mockZoneWithAutoAdjust() {
+ return mockZone(1, ZONE_NAME_1, 100, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ }
+
+ private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+ byte[] newLogicalTopology = toBytes(nodes);
+
+ when(vaultMgr.get(zonesLogicalTopologyKey()))
+ .thenReturn(completedFuture(new
VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+ }
+
+ private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+ byte[] newLogicalTopology = toBytes(nodes);
+
+ org.apache.ignite.internal.metastorage.client.Entry newEntry =
+ new EntryImpl(zonesLogicalTopologyKey(), newLogicalTopology,
rev, 1);
+
+ EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+ WatchEvent evt = new WatchEvent(entryEvent);
+
+ watchListener.onUpdate(evt);
+ }
+
+ private void mockVaultAppliedRevision(long revision) {
+ when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new
VaultEntry(APPLIED_REV, longToBytes(revision))));
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index aa2930acb5..6d5a560e48 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -352,7 +352,7 @@ public class IgniteImpl implements Ignite {
DistributionZonesConfiguration zonesConfiguration =
clusterCfgMgr.configurationRegistry()
.getConfiguration(DistributionZonesConfiguration.KEY);
- distributionZoneManager = new
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr,
logicalTopologyService);
+ distributionZoneManager = new
DistributionZoneManager(zonesConfiguration, metaStorageMgr,
logicalTopologyService, vaultMgr);
restComponent = createRestComponent(name);