This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b560f12406 IGNITE-19602 API and tests for causality data nodes in
DistributionZoneManager (#2170)
b560f12406 is described below
commit b560f124065a1f3d93ffa2fc17632de9ee4b26a6
Author: Sergey Uttsel <[email protected]>
AuthorDate: Mon Jun 19 10:42:10 2023 +0300
IGNITE-19602 API and tests for causality data nodes in
DistributionZoneManager (#2170)
---
.../distributionzones/DistributionZoneManager.java | 15 +
.../DistributionZoneCausalityDataNodesTest.java | 1094 ++++++++++++++++++++
2 files changed, 1109 insertions(+)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9666848b01..2be2641924 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -650,6 +650,21 @@ public class DistributionZoneManager implements
IgniteComponent {
});
}
+ /**
+ * Asynchronously gets data nodes of the zone using causality token.
+ *
+ * <p>The returned future can be completed with {@link
DistributionZoneNotFoundException} if the zone with the provided {@code zoneId}
+ * does not exist.
+ *
+ * @param causalityToken Causality token.
+ * @param zoneId Zone id.
+ * @return The future which will be completed with data nodes for the
zoneId or with exception.
+ */
+ // TODO: Will be implemented in IGNITE-19506.
+ public CompletableFuture<Set<String>> dataNodes(long causalityToken, int
zoneId) {
+ return null;
+ }
+
/**
* Creates configuration listener for updates of scale up value.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneCausalityDataNodesTest.java
new file mode 100644
index 0000000000..3d44dabca2
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneCausalityDataNodesTest.java
@@ -0,0 +1,1094 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for causality data nodes updating in {@link DistributionZoneManager}.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
+public class DistributionZoneCausalityDataNodesTest extends
BaseDistributionZoneManagerTest {
+ private static final String ZONE_NAME_1 = "zone1";
+
+ private static final String ZONE_NAME_2 = "zone2";
+
+ private static final int ZONE_ID_1 = 1;
+
+ private static final int ZONE_ID_2 = 2;
+
+ private static final LogicalNode NODE_0 =
+ new LogicalNode("node_id_0", "node_name_0", new
NetworkAddress("localhost", 123));
+
+ private static final LogicalNode NODE_1 =
+ new LogicalNode("node_id_1", "node_name_1", new
NetworkAddress("localhost", 123));
+
+ private static final LogicalNode NODE_2 =
+ new LogicalNode("node_id_2", "node_name_2", new
NetworkAddress("localhost", 123));
+
+ /**
+ * Contains futures that is completed when the topology watch listener
receive the event with expected logical topology.
+ * Mapping of node names -> future with event revision.
+ */
+ private final ConcurrentHashMap<Set<String>, CompletableFuture<Long>>
topologyRevisions = new ConcurrentHashMap<>();
+
+ /**
+ * Contains futures that is completed when the data nodes watch listener
receive the event with expected zone id and data nodes.
+ * Mapping of zone id and node names -> future with event revision.
+ */
+ private final ConcurrentHashMap<IgniteBiTuple<Integer, Set<String>>,
CompletableFuture<Long>> zoneDataNodesRevisions =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Contains futures that is completed when the scale up update listener
receive the event with expected zone id.
+ * Mapping of zone id -> future with event revision.
+ */
+ private final ConcurrentHashMap<Integer, CompletableFuture<Long>>
zoneScaleUpRevisions = new ConcurrentHashMap<>();
+
+ /**
+ * Contains futures that is completed when the scale down update listener
receive the event with expected zone id.
+ * Mapping of zone id -> future with event revision.
+ */
+ private final ConcurrentHashMap<Integer, CompletableFuture<Long>>
zoneScaleDownRevisions = new ConcurrentHashMap<>();
+
+ /**
+ * Contains futures that is completed when the zone configuration listener
receive the zone creation event with expected zone id.
+ * Mapping of zone id -> future with event revision.
+ */
+ private final ConcurrentHashMap<Integer, CompletableFuture<Long>>
createZoneRevisions = new ConcurrentHashMap<>();
+
+ /**
+ * Contains futures that is completed when the zone configuration listener
receive the zone dropping event with expected zone id.
+ * Mapping of zone id -> future with event revision.
+ */
+ private final ConcurrentHashMap<Integer, CompletableFuture<Long>>
dropZoneRevisions = new ConcurrentHashMap<>();
+
+ @BeforeEach
+ void beforeEach() throws NodeStoppingException {
+ metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(),
createMetastorageTopologyListener());
+
+ metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(),
createMetastorageDataNodesListener());
+
+ ZonesConfigurationListener zonesConfigurationListener = new
ZonesConfigurationListener();
+
+
zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
+
zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+
zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+
zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
+
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+ distributionZoneManager.start();
+
+ metaStorageManager.deployWatches();
+ }
+
+ /**
+ * Tests data nodes updating on a topology leap.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void topologyLeapUpdate() throws Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate timers.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create the zone with not immediate timers.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_2)
+ .dataNodesAutoAdjustScaleUp(1)
+ .dataNodesAutoAdjustScaleDown(1)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> twoNodes1 = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames1 = Set.of(NODE_0.name(), NODE_1.name());
+
+ CompletableFuture<Long> dataNodesUpdateRevision =
getZoneDataNodesRevision(ZONE_ID_2, twoNodes1);
+
+ // Check that data nodes value of both zone is NODE_0 and NODE_1.
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes1);
+
+ CompletableFuture<Set<String>> dataNodesFut0 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut0, willBe(twoNodesNames1));
+
+ long dataNodesRevisionZone = dataNodesUpdateRevision.get(3, SECONDS);
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(dataNodesRevisionZone, ZONE_ID_2);
+ assertThat(dataNodesFut1, willBe(twoNodesNames1));
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is left. NODE_2 is added.
+ Set<LogicalNode> twoNodes2 = Set.of(NODE_0, NODE_2);
+ Set<String> twoNodesNames2 = Set.of(NODE_0.name(), NODE_2.name());
+
+ dataNodesUpdateRevision = getZoneDataNodesRevision(ZONE_ID_2,
twoNodes2);
+
+ long topologyRevision2 = fireTopologyLeapAndGetRevision(twoNodes2);
+
+ // Check that data nodes value of the zone with immediate timers with
the topology update revision is NODE_0 and NODE_2.
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(twoNodesNames2));
+
+ // Check that data nodes value of the zone with not immediate timers
with the topology update revision is NODE_0 and NODE_1.
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_2);
+ assertThat(dataNodesFut4, willBe(twoNodesNames1));
+
+ // Check that data nodes value of the zone with not immediate timers
with the data nodes update revision is NODE_0 and NODE_2.
+ dataNodesRevisionZone = dataNodesUpdateRevision.get(3, SECONDS);
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(dataNodesRevisionZone, ZONE_ID_2);
+ assertThat(dataNodesFut5, willBe(twoNodesNames2));
+ }
+
+ /**
+ * Tests data nodes updating on a topology leap with not immediate scale
up and immediate scale down.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void topologyLeapUpdateScaleUpNotImmediateAndScaleDownImmediate() throws
Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate timers.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Alter the zone with immediate timers.
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> twoNodes1 = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames1 = Set.of(NODE_0.name(), NODE_1.name());
+
+ // Check that data nodes value of both zone is NODE_0 and NODE_1.
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes1);
+
+ CompletableFuture<Set<String>> dataNodesFut0 =
distributionZoneManager.dataNodes(topologyRevision1, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut0, willBe(twoNodesNames1));
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(twoNodesNames1));
+
+ // Alter zones with not immediate scale up timer.
+ distributionZoneManager.alterZone(ZONE_NAME_1,
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+ .dataNodesAutoAdjustScaleUp(1)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(1)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is left. NODE_2 is added.
+ Set<LogicalNode> twoNodes2 = Set.of(NODE_0, NODE_2);
+ Set<String> twoNodesNames2 = Set.of(NODE_0.name(), NODE_2.name());
+
+ CompletableFuture<Long> dataNodesUpdateRevision0 =
getZoneDataNodesRevision(DEFAULT_ZONE_ID, twoNodes2);
+ CompletableFuture<Long> dataNodesUpdateRevision1 =
getZoneDataNodesRevision(ZONE_ID_1, twoNodes2);
+
+ long topologyRevision2 = fireTopologyLeapAndGetRevision(twoNodes2);
+
+ // Check that data nodes value of zones is NODE_0 because scale up
timer has not fired yet.
+ Set<String> oneNodeNames = Set.of(NODE_0.name());
+
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut2, willBe(oneNodeNames));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(oneNodeNames));
+
+ // Check that data nodes value of zones is NODE_0 and NODE_2.
+ long dataNodesRevisionZone0 = dataNodesUpdateRevision0.get(3, SECONDS);
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(dataNodesRevisionZone0, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut4, willBe(twoNodesNames2));
+
+ long dataNodesRevisionZone1 = dataNodesUpdateRevision1.get(3, SECONDS);
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(dataNodesRevisionZone1, ZONE_ID_1);
+ assertThat(dataNodesFut5, willBe(twoNodesNames2));
+ }
+
+ /**
+ * Tests data nodes updating on a topology leap with immediate scale up
and not immediate scale down.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void topologyLeapUpdateScaleUpImmediateAndScaleDownNotImmediate() throws
Exception {
+ // Prerequisite.
+
+ // Create the zone with not immediate scale down timer.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(1)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Alter the zone with not immediate scale down timer.
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(1)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> twoNodes1 = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames1 = Set.of(NODE_0.name(), NODE_1.name());
+
+ // Check that data nodes value of both zone is NODE_0 and NODE_1.
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes1);
+
+ CompletableFuture<Set<String>> dataNodesFut0 =
distributionZoneManager.dataNodes(topologyRevision1, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut0, willBe(twoNodesNames1));
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(twoNodesNames1));
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is left. NODE_2 is added.
+ Set<LogicalNode> twoNodes2 = Set.of(NODE_0, NODE_2);
+ Set<String> twoNodesNames2 = Set.of(NODE_0.name(), NODE_2.name());
+
+ CompletableFuture<Long> dataNodesUpdateRevision0 =
getZoneDataNodesRevision(DEFAULT_ZONE_ID, twoNodes2);
+ CompletableFuture<Long> dataNodesUpdateRevision1 =
getZoneDataNodesRevision(ZONE_ID_1, twoNodes2);
+
+ long topologyRevision2 = fireTopologyLeapAndGetRevision(twoNodes2);
+
+ // Check that data nodes value of zones is NODE_0, NODE_1 and NODE_2
because scale down timer has not fired yet.
+ Set<LogicalNode> threeNodes = Set.of(NODE_0, NODE_1, NODE_2);
+ Set<String> threeNodesNames = Set.of(NODE_0.name(), NODE_1.name(),
NODE_2.name());
+
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut2, willBe(threeNodesNames));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(threeNodesNames));
+
+ // Check that data nodes value of zones is NODE_0 and NODE_2.
+ long dataNodesRevisionZone0 = dataNodesUpdateRevision0.get(3, SECONDS);
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(dataNodesRevisionZone0, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut4, willBe(twoNodesNames2));
+
+ long dataNodesRevisionZone1 = dataNodesUpdateRevision1.get(3, SECONDS);
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(dataNodesRevisionZone1, ZONE_ID_1);
+ assertThat(dataNodesFut5, willBe(twoNodesNames2));
+ }
+
+ /**
+ * Tests data nodes updating on a scale up changing.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void dataNodesUpdatedAfterScaleUpChanged() throws Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate timers.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0.
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_0, oneNode);
+
+ // Check that data nodes value of the the zone is NODE_0.
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(oneNodeName));
+
+ // Changes a scale up timer to not immediate.
+ distributionZoneManager.alterZone(
+ ZONE_NAME_1,
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+ .dataNodesAutoAdjustScaleUp(10000)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is added.
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ long topologyRevision2 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes);
+
+ // Check that data nodes value of the zone with the topology update
revision is NODE_0 because scale up timer has not fired yet.
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut2, willBe(oneNode));
+
+ // Change scale up value to immediate.
+ long scaleUpRevision = alterZoneScaleUpAndGetRevision(ZONE_NAME_1,
IMMEDIATE_TIMER_VALUE);
+
+ // Check that data nodes value of the zone with the scale up update
revision is NODE_0 and NODE_1.
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(scaleUpRevision, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(twoNodesNames));
+ }
+
+ /**
+ * Tests data nodes updating on a scale down changing.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void dataNodesUpdatedAfterScaleDownChanged() throws Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate scale up timer and not immediate
scale down timer.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(10000)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes);
+
+ // Check that data nodes value of the the zone is NODE_0 and NODE_1.
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(twoNodesNames));
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is left.
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ long topologyRevision2 =
removeNodeInLogicalTopologyAndGetRevision(Set.of(NODE_1), oneNode);
+
+ // Check that data nodes value of the zone with the topology update
revision is NODE_0 and NODE_1
+ // because scale down timer has not fired yet.
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut2, willBe(twoNodesNames));
+
+ // Change scale down value to immediate.
+ long scaleDownRevision = alterZoneScaleDownAndGetRevision(ZONE_NAME_1,
IMMEDIATE_TIMER_VALUE);
+
+ // Check that data nodes value of the zone with the scale down update
revision is NODE_0.
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(scaleDownRevision, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(oneNodeName));
+ }
+
+ /**
+ * Tests data nodes dropping when a scale up task is scheduled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void scheduleScaleUpTaskThenDropZone() throws Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate timers.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Alter the zone with immediate timers.
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ // Check that data nodes value of both zone is NODE_0.
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_0, Set.of(NODE_0));
+
+ CompletableFuture<Set<String>> dataNodesFut0 =
distributionZoneManager.dataNodes(topologyRevision1, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut0, willBe(oneNodeName));
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(oneNodeName));
+
+ // Alter the zones with not immediate scale up timer.
+ distributionZoneManager.alterZone(ZONE_NAME_1,
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+ .dataNodesAutoAdjustScaleUp(10000)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Alter the zone with not immediate scale up timer.
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(10000)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is added.
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ long topologyRevision2 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes);
+
+ long dropRevision0 = dropZoneAndGetRevision(DEFAULT_ZONE_NAME);
+ long dropRevision1 = dropZoneAndGetRevision(ZONE_NAME_1);
+
+ // Check that data nodes value of the zone with the topology update
revision is NODE_0 because scale up timer has not fired.
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut2, willBe(NODE_0));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(NODE_0));
+
+ // Check that zones is removed and attempt to get data nodes throws an
exception.
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(dropRevision0, DEFAULT_ZONE_ID);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut4.get(3, SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(dropRevision1, ZONE_ID_1);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut5.get(3, SECONDS));
+ }
+
+ /**
+ * Tests data nodes dropping when a scale down task is scheduled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void scheduleScaleDownTaskThenDropZone() throws Exception {
+ // Prerequisite.
+
+ // Create the zone with immediate scale up timer and not immediate
scale down timer.
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(10000)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes);
+
+ // Check that data nodes value of the the zone is NODE_0 and NODE_1.
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut1, willBe(twoNodesNames));
+
+ // Test steps.
+
+ // Change logical topology. NODE_1 is removed.
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ long topologyRevision2 =
removeNodeInLogicalTopologyAndGetRevision(Set.of(NODE_1), oneNode);
+
+ long dropRevision0 = dropZoneAndGetRevision(DEFAULT_ZONE_NAME);
+ long dropRevision1 = dropZoneAndGetRevision(ZONE_NAME_1);
+
+ // Check that data nodes value of the zone with the topology update
revision is NODE_0 and NODE_1
+ // because scale down timer has not fired.
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision2, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut2, willBe(twoNodesNames));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut3, willBe(twoNodesNames));
+
+ // Check that zones is removed and attempt to get data nodes throws an
exception.
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(dropRevision0, DEFAULT_ZONE_ID);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut4.get(3, SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(dropRevision1, ZONE_ID_1);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut5.get(3, SECONDS));
+ }
+
+ /**
+ * Tests data nodes obtaining with revision before a zone creation and
after a zone dropping.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void createThenDropZone() throws Exception {
+ // Prerequisite.
+
+ // Create logical topology with NODE_0 and NODE_1.
+ topology.putNode(NODE_0);
+ topology.putNode(NODE_1);
+
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ putNodeInLogicalTopologyAndGetRevision(NODE_0, oneNode);
+
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ // Test steps.
+
+ // Create a zone.
+ long createZoneRevision = createZoneAndGetRevision(ZONE_NAME_1,
ZONE_ID_1, IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+ // Check that data nodes value of the zone with the revision lower
than the create zone revision is absent.
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(createZoneRevision - 1, ZONE_ID_1);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut1.get(3, SECONDS));
+
+ // Check that data nodes value of the zone with the create zone
revision is NODE_0 and NODE_1.
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(createZoneRevision, ZONE_ID_1);
+ assertThat(dataNodesFut2, willBe(twoNodesNames));
+
+ // Drop the zone.
+ long dropZoneRevision = dropZoneAndGetRevision(ZONE_NAME_1);
+
+ // Check that data nodes value of the zone with the drop zone revision
is absent.
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(dropZoneRevision, ZONE_ID_1);
+ assertThrows(DistributionZoneNotFoundException.class, () ->
dataNodesFut3.get(3, SECONDS));
+ }
+
+ /**
+ * Tests data nodes obtaining with wrong parameters throw an exception.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void validationTest() {
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(0, DEFAULT_ZONE_ID);
+ assertThrows(AssertionError.class, () -> dataNodesFut1.get(3,
SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(-1, DEFAULT_ZONE_ID);
+ assertThrows(AssertionError.class, () -> dataNodesFut2.get(3,
SECONDS));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(1, -1);
+ assertThrows(AssertionError.class, () -> dataNodesFut3.get(3,
SECONDS));
+ }
+
+ /**
+ * Tests data nodes changing when topology is changed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void simpleTopologyChanges() throws Exception {
+ // Prerequisite.
+
+ // Create zones with immediate timers.
+ distributionZoneManager.alterZone(DEFAULT_ZONE_NAME,
+ new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ distributionZoneManager.alterZone(ZONE_NAME_1,
+ new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
+
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ // Test steps.
+
+ Set<LogicalNode> oneNode = Set.of(NODE_0);
+ Set<String> oneNodeName = Set.of(NODE_0.name());
+
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
+
+ Set<LogicalNode> threeNodes = Set.of(NODE_0, NODE_1, NODE_2);
+ Set<String> threeNodeNames = Set.of(NODE_0.name(), NODE_1.name(),
NODE_2.name());
+
+ // Change logical topology. NODE_0 is added.
+ long topologyRevision0 =
putNodeInLogicalTopologyAndGetRevision(NODE_0, oneNode);
+
+ // Change logical topology. NODE_1 is added.
+ long topologyRevision1 =
putNodeInLogicalTopologyAndGetRevision(NODE_1, twoNodes);
+
+ CompletableFuture<Set<String>> dataNodesFut1 =
distributionZoneManager.dataNodes(topologyRevision0, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut1, willBe(oneNodeName));
+ CompletableFuture<Set<String>> dataNodesFut2 =
distributionZoneManager.dataNodes(topologyRevision0, ZONE_ID_1);
+ assertThat(dataNodesFut2, willBe(oneNodeName));
+
+ CompletableFuture<Set<String>> dataNodesFut3 =
distributionZoneManager.dataNodes(topologyRevision0 + 1, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut3, willBe(oneNodeName));
+ CompletableFuture<Set<String>> dataNodesFut4 =
distributionZoneManager.dataNodes(topologyRevision0 + 1, ZONE_ID_1);
+ assertThat(dataNodesFut4, willBe(oneNodeName));
+
+ CompletableFuture<Set<String>> dataNodesFut5 =
distributionZoneManager.dataNodes(topologyRevision1, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut5, willBe(twoNodesNames));
+ CompletableFuture<Set<String>> dataNodesFut6 =
distributionZoneManager.dataNodes(topologyRevision1, ZONE_ID_1);
+ assertThat(dataNodesFut6, willBe(twoNodesNames));
+
+ Set<LogicalNode> twoNodes1 = Set.of(NODE_0, NODE_2);
+ Set<String> twoNodesNames1 = Set.of(NODE_0.name(), NODE_2.name());
+
+ // Change logical topology. NODE_2 is added.
+ long topologyRevision2 =
putNodeInLogicalTopologyAndGetRevision(NODE_2, twoNodes1);
+
+ // Change logical topology. NODE_1 is left.
+ long topologyRevision3 =
removeNodeInLogicalTopologyAndGetRevision(Set.of(NODE_1), twoNodes1);
+
+ CompletableFuture<Set<String>> dataNodesFut7 =
distributionZoneManager.dataNodes(topologyRevision2, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut7, willBe(threeNodeNames));
+ CompletableFuture<Set<String>> dataNodesFut8 =
distributionZoneManager.dataNodes(topologyRevision2, ZONE_ID_1);
+ assertThat(dataNodesFut8, willBe(threeNodeNames));
+
+ CompletableFuture<Set<String>> dataNodesFut9 =
distributionZoneManager.dataNodes(topologyRevision3, DEFAULT_ZONE_ID);
+ assertThat(dataNodesFut9, willBe(twoNodesNames1));
+ CompletableFuture<Set<String>> dataNodesFut10 =
distributionZoneManager.dataNodes(topologyRevision3, ZONE_ID_1);
+ assertThat(dataNodesFut10, willBe(twoNodesNames1));
+ }
+
+ /**
+ * Puts a given node as a part of the logical topology and return revision
of a topology watch listener event.
+ *
+ * @param node Node to put.
+ * @param expectedTopology Expected topology for future completing.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long putNodeInLogicalTopologyAndGetRevision(
+ LogicalNode node,
+ Set<LogicalNode> expectedTopology
+ ) throws Exception {
+ Set<String> nodeNames =
expectedTopology.stream().map(ClusterNode::name).collect(toSet());
+
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ topologyRevisions.put(nodeNames, revisionFut);
+
+ topology.putNode(node);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Removes given nodes from the logical topology and return revision of a
topology watch listener event.
+ *
+ * @param nodes Nodes to remove.
+ * @param expectedTopology Expected topology for future completing.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long removeNodeInLogicalTopologyAndGetRevision(
+ Set<LogicalNode> nodes,
+ Set<LogicalNode> expectedTopology
+ ) throws Exception {
+ Set<String> nodeNames =
expectedTopology.stream().map(ClusterNode::name).collect(toSet());
+
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ topologyRevisions.put(nodeNames, revisionFut);
+
+ topology.removeNodes(nodes);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Changes data nodes in logical topology and return revision of a
topology watch listener event.
+ *
+ * @param nodes Nodes to remove.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long fireTopologyLeapAndGetRevision(Set<LogicalNode> nodes) throws
Exception {
+ Set<String> nodeNames =
nodes.stream().map(ClusterNode::name).collect(toSet());
+
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ topologyRevisions.put(nodeNames, revisionFut);
+
+ long topVer = topology.getLogicalTopology().version() + 1;
+
+ clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(topVer, nodes)));
+
+ topology.fireTopologyLeap();
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Changes a scale up timer value of a zone and return the revision of a
zone update event.
+ *
+ * @param zoneName Zone name.
+ * @param scaleUp New scale up value.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long alterZoneScaleUpAndGetRevision(String zoneName, int scaleUp)
throws Exception {
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ int zoneId = distributionZoneManager.getZoneId(zoneName);
+
+ zoneScaleUpRevisions.put(zoneId, revisionFut);
+
+ distributionZoneManager.alterZone(zoneName, new Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(scaleUp).build())
+ .get(3, SECONDS);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Changes a scale down timer value of a zone and return the revision of a
zone update event.
+ *
+ * @param zoneName Zone name.
+ * @param scaleDown New scale down value.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long alterZoneScaleDownAndGetRevision(String zoneName, int
scaleDown) throws Exception {
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ int zoneId = distributionZoneManager.getZoneId(zoneName);
+
+ zoneScaleDownRevisions.put(zoneId, revisionFut);
+
+ distributionZoneManager.alterZone(zoneName, new Builder(zoneName)
+ .dataNodesAutoAdjustScaleDown(scaleDown).build())
+ .get(3, SECONDS);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Creates a zone and return the revision of a create zone event.
+ *
+ * @param zoneName Zone name.
+ * @param zoneId Zone id.
+ * @param scaleUp Scale up value.
+ * @param scaleDown Scale down value.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long createZoneAndGetRevision(String zoneName, int zoneId, int
scaleUp, int scaleDown) throws Exception {
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ createZoneRevisions.put(zoneId, revisionFut);
+
+ distributionZoneManager.createZone(
+ new
DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(scaleUp)
+ .dataNodesAutoAdjustScaleDown(scaleDown)
+ .build()
+ )
+ .get(3, SECONDS);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Drops a zone and return the revision of a drop zone event.
+ *
+ * @param zoneName Zone name.
+ * @return Revision.
+ * @throws Exception If failed.
+ */
+ private long dropZoneAndGetRevision(String zoneName) throws Exception {
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ int zoneId = distributionZoneManager.getZoneId(zoneName);
+
+ dropZoneRevisions.put(zoneId, revisionFut);
+
+ distributionZoneManager.dropZone(zoneName).get(3, SECONDS);
+
+ return revisionFut.get(3, SECONDS);
+ }
+
+ /**
+ * Returns a future which will be completed when expected data nodes will
be saved to the meta storage.
+ * In order to complete the future need to invoke one of the methods that
change the logical topology.
+ *
+ * @param zoneId Zone id.
+ * @param nodes Expected data nodes.
+ * @return Future with revision.
+ */
+ private CompletableFuture<Long> getZoneDataNodesRevision(int zoneId,
Set<LogicalNode> nodes) {
+ Set<String> nodeNames = nodes.stream().map(node ->
node.name()).collect(toSet());
+
+ CompletableFuture<Long> revisionFut = new CompletableFuture<>();
+
+ return zoneDataNodesRevisions.put(new IgniteBiTuple<>(zoneId,
nodeNames), revisionFut);
+ }
+
+ /**
+ * Creates a configuration listener which completes futures from {@code
zoneScaleUpRevisions}
+ * when receives event with expected zone id.
+ *
+ * @return Configuration listener.
+ */
+ private ConfigurationListener<Integer> onUpdateScaleUp() {
+ return ctx -> {
+ int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
+
+ if (zoneScaleUpRevisions.containsKey(zoneId)) {
+
zoneScaleUpRevisions.remove(zoneId).complete(ctx.storageRevision());
+ }
+
+ return completedFuture(null);
+ };
+ }
+
+ /**
+ * Creates a configuration listener which completes futures from {@code
zoneScaleDownRevisions}
+ * when receives event with expected zone id.
+ *
+ * @return Configuration listener.
+ */
+ private ConfigurationListener<Integer> onUpdateScaleDown() {
+ return ctx -> {
+ int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
+
+ if (zoneScaleDownRevisions.containsKey(zoneId)) {
+
zoneScaleDownRevisions.remove(zoneId).complete(ctx.storageRevision());
+ }
+
+ return completedFuture(null);
+ };
+ }
+
+ /**
+ * A configuration listener which completes futures from {@code
createZoneRevisions} and {@code dropZoneRevisions}
+ * when receives event with expected zone id.
+ */
+ private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ int zoneId = ctx.newValue().zoneId();
+
+ if (createZoneRevisions.containsKey(zoneId)) {
+
createZoneRevisions.remove(zoneId).complete(ctx.storageRevision());
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
+ int zoneId = ctx.oldValue().zoneId();
+
+ if (dropZoneRevisions.containsKey(zoneId)) {
+
dropZoneRevisions.remove(zoneId).complete(ctx.storageRevision());
+ }
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Creates a topology watch listener which completes futures from {@code
topologyRevisions}
+ * when receives event with expected logical topology.
+ *
+ * @return Watch listener.
+ */
+ private WatchListener createMetastorageTopologyListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+
+ Set<NodeWithAttributes> newLogicalTopology = null;
+
+ long revision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
+
+ if (Arrays.equals(e.key(),
zonesLogicalTopologyVersionKey().bytes())) {
+ revision = e.revision();
+ } else if (Arrays.equals(e.key(),
zonesLogicalTopologyKey().bytes())) {
+ newLogicalTopology = fromBytes(e.value());
+ }
+ }
+
+ Set<String> nodeNames = newLogicalTopology.stream().map(node
-> node.nodeName()).collect(toSet());
+
+ if (topologyRevisions.containsKey(nodeNames)) {
+ topologyRevisions.remove(nodeNames).complete(revision);
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ };
+ }
+
+ /**
+ * Creates a data nodes watch listener which completes futures from {@code
zoneDataNodesRevisions}
+ * when receives event with expected data nodes.
+ *
+ * @return Watch listener.
+ */
+ private WatchListener createMetastorageDataNodesListener() {
+ return new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
+
+ int zoneId = 0;
+
+ Set<Node> newDataNodes = null;
+
+ long revision = 0;
+
+ for (EntryEvent event : evt.entryEvents()) {
+ Entry e = event.newEntry();
+
+ if (startsWith(e.key(), zoneDataNodesKey().bytes())) {
+ revision = e.revision();
+
+ zoneId = extractZoneId(e.key());
+
+ byte[] dataNodesBytes = e.value();
+
+ if (dataNodesBytes != null) {
+ newDataNodes =
DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
+ } else {
+ newDataNodes = emptySet();
+ }
+ }
+ }
+
+ Set<String> nodeNames = newDataNodes.stream().map(node ->
node.nodeName()).collect(toSet());
+
+ IgniteBiTuple<Integer, Set<String>> zoneDataNodesKey = new
IgniteBiTuple<>(zoneId, nodeNames);
+
+ if (zoneDataNodesRevisions.containsKey(zoneDataNodesKey)) {
+ zoneDataNodesRevisions.remove(new IgniteBiTuple<>(zoneId,
nodeNames)).complete(revision);
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ };
+ }
+}