This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new adaa25871c8 IGNITE-26544 Implement new REST POST method for data nodes
recalculation (#6902)
adaa25871c8 is described below
commit adaa25871c8892d326cc566c0325dbbd5e1b0256
Author: Mikhail Efremov <[email protected]>
AuthorDate: Tue Nov 25 20:47:02 2025 +0600
IGNITE-26544 Implement new REST POST method for data nodes recalculation
(#6902)
---
.../internal/catalog/CatalogManagerImpl.java | 4 +-
.../ignite/internal/catalog/CatalogService.java | 14 +-
.../partitions/states/ItPartitionStatesTest.java | 2 +-
.../ignite/client/handler/FakeCatalogService.java | 5 +
modules/distribution-zones/build.gradle | 3 +
.../distributionzones/ItDataNodesManagerTest.java | 87 ++-----
.../distributionzones/DataNodesManager.java | 46 ++--
.../distributionzones/DistributionZoneManager.java | 56 ++++-
.../distributionzones/DistributionZonesUtil.java | 40 +++
.../DistributionZoneNotFoundException.java | 10 +
.../distributionzones/DataNodesManagerTest.java | 87 ++-----
.../distributionzones/DataNodesTestUtil.java | 267 ++++++++++++++++++++
.../DistributionZonesTestUtil.java | 26 ++
.../internal/rest/api/zone/DataNodesApi.java | 102 ++++++++
modules/rest/build.gradle | 3 +
.../rest/cluster/ItDataNodesControllerTest.java | 271 +++++++++++++++++++++
.../recovery/ItDisasterRecoveryControllerTest.java | 4 +-
.../rest/cluster/DataNodesRestFactory.java | 49 ++++
.../internal/rest/zone/DataNodesController.java | 58 +++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 5 +-
.../disaster/DisasterRecoveryManager.java | 28 +--
.../exceptions/ZonesNotFoundException.java | 31 ---
22 files changed, 968 insertions(+), 230 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index fc830aeaa8f..26be9cd28dd 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -182,8 +182,8 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
}
@Override
- public int latestCatalogVersion() {
- return catalogByVer.lastEntry().getKey();
+ public Catalog latestCatalog() {
+ return catalogByVer.lastEntry().getValue();
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 7dc021c1890..458eac7e6e2 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -75,6 +75,7 @@ public interface CatalogService extends
EventProducer<CatalogEvent, CatalogEvent
* @return The catalog for the specified version.
* @throws CatalogNotFoundException If the catalog of requested version
was not found.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-26940
Catalog catalog(int catalogVersion);
/**
@@ -126,7 +127,18 @@ public interface CatalogService extends
EventProducer<CatalogEvent, CatalogEvent
* @return The latest registered version of the catalog.
* @see #catalogReadyFuture(int)
*/
- int latestCatalogVersion();
+ default int latestCatalogVersion() {
+ return latestCatalog().version();
+ }
+
+ /**
+ * Returns the latest registered catalog. Effectively returns a catalog
with version from {@link #latestCatalogVersion}. See
+ * the last method's note about it's guarantees.
+ *
+ * @return The latest registered catalog
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-26939
+ Catalog latestCatalog();
/**
* Returns a future, which completes, when catalog of given version will
be available.
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/ItPartitionStatesTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/ItPartitionStatesTest.java
index 2066895b5d9..278e774d179 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/ItPartitionStatesTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/ItPartitionStatesTest.java
@@ -166,7 +166,7 @@ public abstract class ItPartitionStatesTest extends
CliIntegrationTest {
PLAIN_OPTION
);
- assertErrOutputContains("Some distribution zones are missing:
[UNKNOWN_ZONE]");
+ assertErrOutputContains("Distribution zones were not found
[zoneNames=[UNKNOWN_ZONE]]");
assertOutputIsEmpty();
}
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index 64a5b052462..5fef0675553 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -130,6 +130,11 @@ public class FakeCatalogService implements CatalogService {
return 0;
}
+ @Override
+ public Catalog latestCatalog() {
+ return catalog;
+ }
+
@Override
public CompletableFuture<Void> catalogReadyFuture(int version) {
return nullCompletedFuture();
diff --git a/modules/distribution-zones/build.gradle
b/modules/distribution-zones/build.gradle
index 1d6bc17185e..64aae494e44 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -55,6 +55,7 @@ dependencies {
testImplementation project(':ignite-system-view-api')
testImplementation project(':ignite-configuration-system')
+ testImplementation libs.awaitility
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-configuration-system'))
@@ -70,12 +71,14 @@ dependencies {
testImplementation testFixtures(project(':ignite-failure-handler'))
testImplementation testFixtures(project(':ignite-metrics'))
+ testFixturesImplementation libs.awaitility
testFixturesImplementation project(':ignite-raft-api')
testFixturesImplementation project(':ignite-metastorage')
testFixturesImplementation project(':ignite-schema')
testFixturesImplementation project(':ignite-cluster-management')
testFixturesImplementation project(':ignite-vault')
testFixturesImplementation project(':ignite-catalog')
+ testFixturesImplementation project(':ignite-runner')
testFixturesImplementation testFixtures(project(':ignite-core'))
testFixturesImplementation testFixtures(project(':ignite-metastorage'))
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDataNodesManagerTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDataNodesManagerTest.java
index 5bf4556f520..ee1a89ae81a 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDataNodesManagerTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDataNodesManagerTest.java
@@ -18,23 +18,17 @@
package org.apache.ignite.internal.distributionzones;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.createZoneWithInfiniteTimers;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.recalculateZoneDataNodesManuallyAndWaitForDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.waitForDataNodes;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
-import org.apache.ignite.internal.hlc.ClockService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
class ItDataNodesManagerTest extends ClusterPerTestIntegrationTest {
@@ -46,10 +40,10 @@ class ItDataNodesManagerTest extends
ClusterPerTestIntegrationTest {
}
@Test
- public void manualDataNodeRecalculationIdempotencyTest() throws
InterruptedException {
+ public void manualDataNodeRecalculationIdempotencyTest() {
IgniteImpl node = unwrapIgniteImpl(node(0));
- createZoneWithInfiniteTimers(node);
+ createZoneWithInfiniteTimers(node, ZONE_NAME);
waitForDataNodes(node, ZONE_NAME, Set.of(node.name()));
@@ -57,75 +51,22 @@ class ItDataNodesManagerTest extends
ClusterPerTestIntegrationTest {
}
@Test
- public void manualDataNodeRecalculationAfterNewNodeAddedTest() throws
InterruptedException {
+ public void manualDataNodeRecalculationAfterNewNodeAddedTest() {
IgniteImpl node = unwrapIgniteImpl(node(0));
- createZoneWithInfiniteTimers(node);
+ createZoneWithInfiniteTimers(node, ZONE_NAME);
waitForDataNodes(node, ZONE_NAME, Set.of(node.name()));
startNode(1);
- assertTrue(waitForCondition(() ->
node.logicalTopologyService().localLogicalTopology().nodes().size() == 2,
1000));
+ LogicalTopologyService logicalTopologyService =
node.logicalTopologyService();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(2,
logicalTopologyService.localLogicalTopology().nodes().size()));
waitForDataNodes(node, ZONE_NAME, Set.of(node.name()));
recalculateZoneDataNodesManuallyAndWaitForDataNodes(node, ZONE_NAME,
Set.of(node.name(), node(1).name()));
}
-
- private static void createZoneWithInfiniteTimers(IgniteImpl node) {
- DistributionZonesTestUtil.createZone(node.catalogManager(), ZONE_NAME,
INFINITE_TIMER_VALUE, INFINITE_TIMER_VALUE, null);
-
- CatalogManager catalogManager = node.catalogManager();
-
- CatalogZoneDescriptor zoneDesc =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(ZONE_NAME);
-
- assertNotNull(zoneDesc);
- }
-
- private static void waitForDataNodes(
- IgniteImpl node,
- String zoneName,
- Set<String> expectedNodes
- ) throws InterruptedException {
- CatalogManager catalogManager = node.catalogManager();
-
- ClockService clock = node.clockService();
-
- CatalogZoneDescriptor zone =
catalogManager.activeCatalog(clock.now().longValue()).zone(zoneName);
- int zoneId = zone.id();
-
- DataNodesManager dataNodesManager =
node.distributionZoneManager().dataNodesManager();
-
- boolean success = waitForCondition(() -> {
- CompletableFuture<Set<String>> dataNodesFuture =
dataNodesManager.dataNodes(zoneId, clock.now());
- assertThat(dataNodesFuture, willSucceedFast());
- return dataNodesFuture.join().equals(expectedNodes);
- }, 10_000);
-
- assertTrue(
- success,
- format(
- "Expected {}, but actual {}.",
- expectedNodes,
- dataNodesManager.dataNodes(zoneId, clock.now()).join()
- )
- );
- }
-
- private static void recalculateZoneDataNodesManuallyAndWaitForDataNodes(
- IgniteImpl node,
- String zoneName,
- Set<String> expectedDataNodes
- ) throws InterruptedException {
- CompletableFuture<Set<String>> futureRecalculationResult =
node.distributionZoneManager()
- .dataNodesManager()
- .recalculateDataNodes(zoneName);
-
- assertThat(futureRecalculationResult, willCompleteSuccessfully());
-
- assertEquals(expectedDataNodes, futureRecalculationResult.join());
-
- waitForDataNodes(node, zoneName, expectedDataNodes);
- }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
index 39984d66f25..354e2c0a699 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
@@ -572,7 +572,13 @@ public class DataNodesManager {
Set<NodeWithAttributes> filteredDataNodes =
filterDataNodes(logicalTopology, zoneDescriptor);
- return
recalculateAndApplyDataNodesToMetastoreImmediately(zoneDescriptor,
filteredDataNodes, timestamp, dataNodesHistoryContext);
+ return recalculateAndApplyDataNodesToMetastoreImmediately(
+ zoneDescriptor,
+ filteredDataNodes,
+ timestamp,
+ dataNodesHistoryContext,
+ "distribution zone filter change"
+ );
}
/**
@@ -913,12 +919,14 @@ public class DataNodesManager {
}
/**
- * Unlike {@link #dataNodes} this method recalculates the data nodes,
writes it to metastorage and history, and returns them.
+ * Unlike {@link #dataNodes} this method recalculates the data nodes for
given zone and writes them to metastorage.
*
* @param zoneName Zone name.
- * @return Recalculated data nodes for the given zone.
+ * @return The future with recalculated data nodes for the given zone.
*/
- public CompletableFuture<Set<String>> recalculateDataNodes(String
zoneName) {
+ public CompletableFuture<Void> recalculateDataNodes(String zoneName) {
+ Objects.requireNonNull(zoneName, "Zone name is required.");
+
int catalogVersion = catalogManager.latestCatalogVersion();
CatalogZoneDescriptor zoneDescriptor =
catalogManager.catalog(catalogVersion).zone(zoneName);
@@ -930,25 +938,7 @@ public class DataNodesManager {
return recalculateDataNodes(zoneDescriptor);
}
- /**
- * Unlike {@link #dataNodes} this method recalculates the data nodes,
writes it to metastorage and history, and returns them.
- *
- * @param zoneId Zone ID.
- * @return Recalculated data nodes for the given zone.
- */
- public CompletableFuture<Set<String>> recalculateDataNodes(int zoneId) {
- int catalogVersion = catalogManager.latestCatalogVersion();
-
- CatalogZoneDescriptor zoneDescriptor =
catalogManager.catalog(catalogVersion).zone(zoneId);
-
- if (zoneDescriptor == null) {
- return failedFuture(new DistributionZoneNotFoundException(zoneId));
- }
-
- return recalculateDataNodes(zoneDescriptor);
- }
-
- private CompletableFuture<Set<String>>
recalculateDataNodes(CatalogZoneDescriptor zoneDescriptor) {
+ private CompletableFuture<Void> recalculateDataNodes(CatalogZoneDescriptor
zoneDescriptor) {
int zoneId = zoneDescriptor.id();
Set<NodeWithAttributes> currentLogicalTopology = topologyNodes();
@@ -962,17 +952,19 @@ public class DataNodesManager {
zoneDescriptor,
filteredDataNodes,
clockService.now(),
- dataNodesHistoryContext
+ dataNodesHistoryContext,
+ "manual data nodes recalculation"
)),
true
- ).thenApply(v -> nodeNames(filteredDataNodes));
+ );
}
private @Nullable DataNodesHistoryMetaStorageOperation
recalculateAndApplyDataNodesToMetastoreImmediately(
CatalogZoneDescriptor zoneDescriptor,
Set<NodeWithAttributes> filteredDataNodes,
HybridTimestamp timestamp,
- DataNodesHistoryContext dataNodesHistoryContext
+ DataNodesHistoryContext dataNodesHistoryContext,
+ String operationName
) {
assert dataNodesHistoryContext != null : "Data nodes history and
timers are missing, zone=" + zoneDescriptor;
@@ -994,7 +986,7 @@ public class DataNodesManager {
clearTimer(zoneScaleUpTimerKey(zoneId)),
clearTimer(zoneScaleDownTimerKey(zoneId))
))
- .operationName("distribution zone filter change")
+ .operationName(operationName)
.currentDataNodesHistory(dataNodesHistory)
.currentTimestamp(timestamp)
.historyEntryTimestamp(timestamp)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 2946700bce7..3f3fb8616e5 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForRecoverableStateChanges;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterZonesForOperations;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersionAndClusterId;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLastHandledTopology;
@@ -61,6 +62,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
@@ -339,7 +342,7 @@ public class DistributionZoneManager extends
}
/**
- * Returns data nodes for the given time.
+ * Returns data nodes at the current time.
*
* @param zoneId Zone id.
* @return Data nodes for the current time.
@@ -350,6 +353,27 @@ public class DistributionZoneManager extends
return dataNodes(current, catalogVersion, zoneId);
}
+ /**
+ * Returns data nodes at the current time.
+ *
+ * @param zoneName Zone name.
+ * @return Returns data nodes at the current time.
+ */
+ public CompletableFuture<Set<String>> currentDataNodes(String zoneName) {
+ Objects.requireNonNull(zoneName, "Zone name is required.");
+
+ HybridTimestamp current = clockService.current();
+ int catalogVersion =
catalogManager.activeCatalogVersion(current.longValue());
+
+ CatalogZoneDescriptor zoneDesc =
catalogManager.catalog(catalogVersion).zone(zoneName);
+
+ if (zoneDesc == null) {
+ throw new DistributionZoneNotFoundException(zoneName);
+ }
+
+ return dataNodes(current, catalogVersion, zoneDesc.id());
+ }
+
/**
* Gets data nodes of the zone using causality token and catalog version.
{@code timestamp} must be agreed
* with the {@code catalogVersion}, meaning that for the provided {@code
timestamp} actual {@code catalogVersion} must be provided.
@@ -386,6 +410,36 @@ public class DistributionZoneManager extends
return dataNodesMap.entrySet().stream().filter(e -> e.getValue() >
0).map(Map.Entry::getKey).collect(toSet());
}
+ /**
+ * Recalculates data nodes for given zones and writes them to metastorage.
+ *
+ * @param zoneNames Zone names set. If is empty then the recalculation
will be performed against all known zones
+ * at the moment.
+ * @return The future with recalculated data nodes for the given zones set.
+ */
+ @SuppressWarnings("rawtypes")
+ public CompletableFuture<Void> recalculateDataNodes(Set<String> zoneNames)
throws DistributionZoneNotFoundException {
+ Collection<CatalogZoneDescriptor> zones =
catalogManager.latestCatalog().zones();
+
+ CompletableFuture[] recalculationFutures =
filterZonesForOperations(zoneNames, zones)
+ .stream()
+ .map(CatalogObjectDescriptor::name)
+ .map(this::recalculateDataNodes)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(recalculationFutures);
+ }
+
+ /**
+ * Recalculates data nodes for given zone and writes them to metastorage.
+ *
+ * @param zoneName Zone name.
+ * @return The future with recalculated data nodes for the given zone.
+ */
+ public CompletableFuture<Void> recalculateDataNodes(String zoneName)
throws DistributionZoneNotFoundException {
+ return dataNodesManager.recalculateDataNodes(zoneName).thenAccept(v ->
{});
+ }
+
private CompletableFuture<Void>
onUpdateScaleUpBusy(AlterZoneEventParameters parameters) {
HybridTimestamp timestamp =
metaStorageManager.timestampByRevisionLocally(parameters.causalityToken());
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index bceb50ffe7e..6f8e90f5202 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -44,11 +44,13 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
import
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DistributionZoneTimerSerializer;
+import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -559,6 +562,43 @@ public class DistributionZonesUtil {
return
nodes.stream().map(NodeWithAttributes::nodeName).collect(toSet());
}
+ /**
+ * Filters given zone descriptors by given zone names for user operations
over zones. In case if zone names set is empty, this method
+ * returns all zones. If zone names set contains names that aren't
presented among given zone descriptors then an exception with all
+ * missed zone names will be thrown.
+ *
+ * @param zoneNames Zone names to filter by and that are required for a
user operation. If is empty then the operation will be applied
+ * for all zones.
+ * @param zones Catalog zone descriptors to filter out.
+ * @return Filtered out by zone names collection of zone descriptors to
apply some user operation.
+ * @throws DistributionZoneNotFoundException In case if there are zone
names that aren't presented among given catalog zone
+ * descriptors.
+ */
+ public static Collection<CatalogZoneDescriptor> filterZonesForOperations(
+ Set<String> zoneNames,
+ Collection<CatalogZoneDescriptor> zones
+ ) throws DistributionZoneNotFoundException {
+ if (zoneNames.isEmpty()) {
+ return zones;
+ }
+
+ List<CatalogZoneDescriptor> zoneDescriptors = zones.stream()
+ .filter(catalogZoneDescriptor ->
zoneNames.contains(catalogZoneDescriptor.name()))
+ .collect(toList());
+
+ Set<String> foundZoneNames = zoneDescriptors.stream()
+ .map(CatalogObjectDescriptor::name)
+ .collect(toSet());
+
+ if (!zoneNames.equals(foundZoneNames)) {
+ Set<String> missingZoneNames =
CollectionUtils.difference(zoneNames, foundZoneNames);
+
+ throw new DistributionZoneNotFoundException(missingZoneNames);
+ }
+
+ return zoneDescriptors;
+ }
+
/**
* Class representing data nodes' related values in Meta Storage.
*/
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
index 5e148cf135e..ef3f6dd0ae1 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/exception/DistributionZoneNotFoundException.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.distributionzones.exception;
import static
org.apache.ignite.lang.ErrorGroups.DistributionZones.ZONE_NOT_FOUND_ERR;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
@@ -47,6 +48,15 @@ public class DistributionZoneNotFoundException extends
IgniteInternalException {
super(ZONE_NOT_FOUND_ERR, "Distribution zone was not found [zoneName="
+ zoneName + ']');
}
+ /**
+ * The constructor.
+ *
+ * @param zoneNames Zone names.
+ */
+ public DistributionZoneNotFoundException(Collection<String> zoneNames) {
+ super(ZONE_NOT_FOUND_ERR, "Distribution zones were not found
[zoneNames=" + zoneNames + ']');
+ }
+
/**
* The constructor.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
index a4b34e9cefe..f71cb4a91d9 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DataNodesManagerTest.java
@@ -26,8 +26,8 @@ import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY;
import static
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.STRONG_CONSISTENCY;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.assertDistributionZoneScaleTimersAreNotScheduled;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
@@ -48,12 +48,12 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
-import
org.apache.ignite.internal.distributionzones.DataNodesManager.ZoneTimerSchedule;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.PartitionResetClosure;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.ClockService;
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.awaitility.Awaitility;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -268,7 +269,7 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
}
@Test
- public void removeNodesScaleDown() throws InterruptedException {
+ public void removeNodesScaleDown() {
removeNodes(Set.of(B));
assertScaleDownNotScheduled(ZONE_NAME_1);
@@ -345,7 +346,7 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
checkDataNodes(ZONE_NAME_1, clock.now(), nodeNames(C));
// Timers are discarded earlier than scheduled.
- assertTrue(waitForCondition(() -> !scaleUpScheduled(ZONE_NAME_1) &&
!scaleDownScheduled(ZONE_NAME_1), 2000));
+ assertDistributionZoneScaleTimersAreNotScheduled(catalogManager,
dataNodesManager, ZONE_NAME_1);
}
@Test
@@ -417,7 +418,7 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
dataNodesManager.onTopologyChange(zone, 1, clock.now(), newTopology,
currentTopology);
- assertTrue(waitForCondition(partitionResetTriggered::get, 2000));
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() ->
assertTrue(partitionResetTriggered.get()));
}
@Test
@@ -533,22 +534,8 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
);
}
- private void waitForDataNodes(String zoneName, Set<String> expectedNodes)
throws InterruptedException {
- CatalogZoneDescriptor zone =
catalogManager.activeCatalog(clock.now().longValue()).zone(zoneName);
- int zoneId = zone.id();
-
- boolean success = waitForCondition(() -> {
- CompletableFuture<Set<String>> dataNodesFuture =
dataNodesManager.dataNodes(zoneId, clock.now());
- assertThat(dataNodesFuture, willSucceedFast());
- return dataNodesFuture.join().equals(expectedNodes);
- }, 10_000);
-
- if (!success) {
- log.info("Expected: " + expectedNodes);
- log.info("Actual: " + dataNodesManager.dataNodes(zoneId,
clock.now()).join());
- }
-
- assertTrue(success);
+ private void waitForDataNodes(String zoneName, Set<String> expectedNodes) {
+ DataNodesTestUtil.waitForDataNodes(catalogManager, dataNodesManager,
clock, zoneName, expectedNodes);
}
private void checkDataNodes(String zoneName, HybridTimestamp timestamp,
Set<String> expectedNodes) {
@@ -560,60 +547,30 @@ public class DataNodesManagerTest extends
BaseIgniteAbstractTest {
assertEquals(expectedNodes, dataNodesFuture.join());
}
- private CatalogZoneDescriptor descriptor(String zoneName) {
- CatalogZoneDescriptor zoneDescriptor =
catalogManager.activeCatalog(clock.now().longValue()).zone(zoneName);
- assertNotNull(zoneDescriptor);
- return zoneDescriptor;
+ private void assertScaleUpScheduledOrDone(String zoneName) {
+ DataNodesTestUtil.assertScaleUpScheduledOrDone(catalogManager,
dataNodesManager, zoneName);
}
- private int zoneId(String zoneName) {
- return descriptor(zoneName).id();
+ private void assertScaleUpNotScheduled(String zoneName) {
+ Awaitility.waitAtMost(1, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertFalse(dataNodesManager.zoneTimers(zoneId(zoneName)).scaleUp.taskIsScheduled()));
}
- private void assertScaleUpScheduledOrDone(String zoneName) throws
InterruptedException {
- boolean success = waitForCondition(() -> {
- ZoneTimerSchedule schedule =
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleUp;
- return schedule.taskIsScheduled() || schedule.taskIsDone();
- }, 2000);
-
- if (!success) {
- ZoneTimerSchedule schedule =
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleUp;
- log.info("Unsuccessful schedule [taskIsScheduled={},
taskIsCancelled={}, taskIsDone={}]."
- + schedule.taskIsScheduled(), schedule.taskIsCancelled(),
schedule.taskIsDone());
- }
-
- assertTrue(success);
+ private void assertScaleDownScheduledOrDone(String zoneName) {
+ DataNodesTestUtil.assertScaleDownScheduledOrDone(catalogManager,
dataNodesManager, zoneName);
}
- private void assertScaleUpNotScheduled(String zoneName) throws
InterruptedException {
- assertFalse(waitForCondition(() ->
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleUp.taskIsScheduled(), 1000));
- }
-
- private void assertScaleDownScheduledOrDone(String zoneName) throws
InterruptedException {
- boolean success = waitForCondition(() -> {
- ZoneTimerSchedule schedule =
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleDown;
- return schedule.taskIsScheduled() || schedule.taskIsDone();
- }, 2000);
-
- if (!success) {
- ZoneTimerSchedule schedule =
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleDown;
- log.info("Unsuccessful schedule [taskIsScheduled={},
taskIsCancelled={}, taskIsDone={}]."
- + schedule.taskIsScheduled(), schedule.taskIsCancelled(),
schedule.taskIsDone());
- }
-
- assertTrue(success);
- }
-
- private void assertScaleDownNotScheduled(String zoneName) throws
InterruptedException {
- assertFalse(waitForCondition(() ->
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleDown.taskIsScheduled(),
1000));
+ private int zoneId(String zoneName) {
+ return DistributionZonesTestUtil.zoneId(catalogManager, zoneName);
}
- private boolean scaleUpScheduled(String zoneName) {
- return
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleUp.taskIsScheduled();
+ private CatalogZoneDescriptor descriptor(String zoneName) {
+ return DistributionZonesTestUtil.descriptor(catalogManager, zoneName);
}
- private boolean scaleDownScheduled(String zoneName) {
- return
dataNodesManager.zoneTimers(zoneId(zoneName)).scaleDown.taskIsScheduled();
+ private void assertScaleDownNotScheduled(String zoneName) {
+ Awaitility.waitAtMost(1, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertFalse(dataNodesManager.zoneTimers(zoneId(zoneName)).scaleDown.taskIsScheduled()));
}
private static Set<String> nodeNames(NodeWithAttributes... nodes) {
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DataNodesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DataNodesTestUtil.java
new file mode 100644
index 00000000000..c87fba6b607
--- /dev/null
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DataNodesTestUtil.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.zoneId;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import
org.apache.ignite.internal.distributionzones.DataNodesManager.ZoneTimerSchedule;
+import
org.apache.ignite.internal.distributionzones.DataNodesManager.ZoneTimers;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.awaitility.Awaitility;
+
+/**
+ * Class that contains useful methods for data nodes testing purposes.
+ */
+public final class DataNodesTestUtil {
+ /**
+ * Creates a zone with given name and both scale up and scale down timers
as infinite.
+ *
+ * @param node Ignite node.
+ * @param zoneName New zone name.
+ */
+ public static void createZoneWithInfiniteTimers(IgniteImpl node, String
zoneName) {
+ DistributionZonesTestUtil.createZone(
+ node.catalogManager(),
+ zoneName,
+ (Integer) INFINITE_TIMER_VALUE,
+ (Integer) INFINITE_TIMER_VALUE,
+ null
+ );
+
+ CatalogManager catalogManager = node.catalogManager();
+
+ CatalogZoneDescriptor zoneDesc =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName);
+
+ assertNotNull(zoneDesc);
+ }
+
+ /**
+ * Waits for data nodes are recalculated as expected for given zone.
+ *
+ * @param node Ignite node.
+ * @param zoneName Zone name to check data nodes for.
+ * @param expectedDataNodes Expected data node names to wait for.
+ */
+ public static void waitForDataNodes(
+ IgniteImpl node,
+ String zoneName,
+ Set<String> expectedDataNodes
+ ) {
+ waitForDataNodes(
+ node.catalogManager(),
+ node.distributionZoneManager().dataNodesManager(),
+ node.clock(),
+ zoneName,
+ expectedDataNodes
+ );
+ }
+
+ /**
+ * Waits for data nodes are recalculated as expected for given zone.
+ *
+ * @param catalogManager Catalog manager.
+ * @param dataNodesManager Data nodes manager.
+ * @param clock Hybrid clock.
+ * @param zoneName Zone name to check data nodes for.
+ * @param expectedDataNodes Expected data node names to wait for.
+ */
+ public static void waitForDataNodes(
+ CatalogManager catalogManager,
+ DataNodesManager dataNodesManager,
+ HybridClock clock,
+ String zoneName,
+ Set<String> expectedDataNodes
+ ) {
+ int zoneId = zoneId(catalogManager, zoneName);
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ CompletableFuture<Set<String>> dataNodesFuture =
dataNodesManager.dataNodes(zoneId, clock.now());
+ assertThat(dataNodesFuture, willSucceedFast());
+ assertEquals(expectedDataNodes, dataNodesFuture.join());
+ });
+ }
+
+ /**
+ * Performs manual data nodes recalculation and waits until recalculated
data nodes will be as expected for given zone.
+ *
+ * @param node Ignite node.
+ * @param zoneName Zone name to check data nodes for.
+ * @param expectedDataNodes Expected data node names to wait for.
+ */
+ public static void recalculateZoneDataNodesManuallyAndWaitForDataNodes(
+ IgniteImpl node,
+ String zoneName,
+ Set<String> expectedDataNodes
+ ) {
+ DistributionZoneManager distributionZoneManager =
node.distributionZoneManager();
+
+ CompletableFuture<Void> futureRecalculationResult =
distributionZoneManager.dataNodesManager()
+ .recalculateDataNodes(zoneName);
+
+ assertThat(futureRecalculationResult, willCompleteSuccessfully());
+
+ CompletableFuture<Set<String>> futureCurrentDataNodes =
distributionZoneManager.currentDataNodes(zoneName);
+
+ assertThat(futureCurrentDataNodes, willCompleteSuccessfully());
+
+ assertEquals(expectedDataNodes, futureCurrentDataNodes.join());
+
+ waitForDataNodes(node, zoneName, expectedDataNodes);
+ }
+
+ /**
+ * Checks that scale up timer for the given zone was set up.
+ *
+ * @param node Ignite node.
+ * @param zoneName Zone name.
+ */
+ public static void assertScaleUpScheduledOrDone(IgniteImpl node, String
zoneName) {
+ assertScaleUpScheduledOrDone(
+ node.catalogManager(),
+ node.distributionZoneManager().dataNodesManager(),
+ zoneName
+ );
+ }
+
+ /**
+ * Checks that scale up timer for the given zone was set up.
+ *
+ * @param catalogManager Catalog manager.
+ * @param dataNodesManager Data nodes manager.
+ * @param zoneName Zone name.
+ */
+ public static void assertScaleUpScheduledOrDone(
+ CatalogManager catalogManager,
+ DataNodesManager dataNodesManager,
+ String zoneName
+ ) {
+ assertDistributionZoneScaleTimerScheduledOrDone(
+ catalogManager,
+ dataNodesManager,
+ zoneName,
+ timers -> timers.scaleUp
+ );
+ }
+
+ /**
+ * Checks that scale down timer for the given zone was set up.
+ *
+ * @param node Ignite node.
+ * @param zoneName Zone name.
+ */
+ public static void assertScaleDownScheduledOrDone(IgniteImpl node, String
zoneName) {
+ assertScaleDownScheduledOrDone(
+ node.catalogManager(),
+ node.distributionZoneManager().dataNodesManager(),
+ zoneName
+ );
+ }
+
+ /**
+ * Checks that scale down timer for the given zone was set up.
+ *
+ * @param catalogManager Catalog manager.
+ * @param dataNodesManager Data nodes manager.
+ * @param zoneName Zone name.
+ */
+ public static void assertScaleDownScheduledOrDone(
+ CatalogManager catalogManager,
+ DataNodesManager dataNodesManager,
+ String zoneName
+ ) {
+ assertDistributionZoneScaleTimerScheduledOrDone(
+ catalogManager,
+ dataNodesManager,
+ zoneName,
+ timers -> timers.scaleDown
+ );
+ }
+
+ private static void assertDistributionZoneScaleTimerScheduledOrDone(
+ CatalogManager catalogManager,
+ DataNodesManager dataNodesManager,
+ String zoneName,
+ Function<ZoneTimers, ZoneTimerSchedule> getScaleTimer
+ ) {
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ ZoneTimerSchedule schedule =
getScaleTimer.apply(dataNodesManager.zoneTimers(zoneId(catalogManager,
zoneName)));
+
+ assertTrue(
+ schedule.taskIsScheduled() || schedule.taskIsDone(),
+ format("Unsuccessful schedule [taskIsScheduled={},
taskIsCancelled={}, taskIsDone={}].",
+ schedule.taskIsScheduled(),
schedule.taskIsCancelled(), schedule.taskIsDone())
+ );
+ });
+ }
+
+ /**
+ * Checks that there no scheduled scale up/down timers for given
distribution zone.
+ *
+ * @param node Ignite node.
+ * @param zoneName Zone name.
+ */
+ public static void
assertDistributionZoneScaleTimersAreNotScheduled(IgniteImpl node, String
zoneName) {
+ assertDistributionZoneScaleTimersAreNotScheduled(
+ node.catalogManager(),
+ node.distributionZoneManager().dataNodesManager(),
+ zoneName
+ );
+ }
+
+ /**
+ * Checks that there no scheduled scale up/down timers for given
distribution zone.
+ *
+ * @param catalogManager Catalog manager.
+ * @param dataNodesManager Data nodes manager.
+ * @param zoneName Zone name.
+ */
+ public static void assertDistributionZoneScaleTimersAreNotScheduled(
+ CatalogManager catalogManager,
+ DataNodesManager dataNodesManager,
+ String zoneName
+ ) {
+ Awaitility.waitAtMost(1, TimeUnit.SECONDS).untilAsserted(() ->
assertFalse(
+ scaleUpScheduled(catalogManager, dataNodesManager, zoneName)
+ || scaleDownScheduled(catalogManager,
dataNodesManager, zoneName))
+ );
+ }
+
+ private static boolean scaleUpScheduled(CatalogManager catalogManager,
DataNodesManager dataNodesManager, String zoneName) {
+ return dataNodesManager.zoneTimers(zoneId(catalogManager,
zoneName)).scaleUp.taskIsScheduled();
+ }
+
+ private static boolean scaleDownScheduled(CatalogManager catalogManager,
DataNodesManager dataNodesManager, String zoneName) {
+ return dataNodesManager.zoneTimers(zoneId(catalogManager,
zoneName)).scaleDown.taskIsScheduled();
+ }
+}
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 2edb86d15cb..dd8a493bb67 100644
---
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -804,4 +804,30 @@ public class DistributionZonesTestUtil {
return zoneId;
}
+
+ /**
+ * Returns catalog descriptor for given zone name.
+ *
+ * @param catalogManager Catalog manager.
+ * @param zoneName Zone name.
+ * @return Catalog descriptor for given zone name.
+ */
+ public static CatalogZoneDescriptor descriptor(CatalogManager
catalogManager, String zoneName) {
+ CatalogZoneDescriptor zoneDescriptor =
catalogManager.latestCatalog().zone(zoneName);
+
+ assertNotNull(zoneDescriptor);
+
+ return zoneDescriptor;
+ }
+
+ /**
+ * Returns identifier of a zone by the given zone name.
+ *
+ * @param catalogManager Catalog manager.
+ * @param zoneName Zone name.
+ * @return Identifier of a zone by the given zone name.
+ */
+ public static int zoneId(CatalogManager catalogManager, String zoneName) {
+ return descriptor(catalogManager, zoneName).id();
+ }
}
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/zone/DataNodesApi.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/zone/DataNodesApi.java
new file mode 100644
index 00000000000..6806433fbef
--- /dev/null
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/zone/DataNodesApi.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.zone;
+
+import io.micronaut.http.annotation.Body;
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Post;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.ArraySchema;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Data nodes of distribution zones controller.
+ */
+@Controller("/management/v1/zones")
+@Tag(name = "dataNodes")
+public interface DataNodesApi {
+ @Get("{zoneName}/datanodes")
+ @Operation(
+ operationId = "getDataNodesForZone",
+ description = "Returns current data nodes for zone."
+ )
+ @ApiResponse(
+ responseCode = "200",
+ description = "Data nodes are returned.",
+ content = @Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ array = @ArraySchema(schema = @Schema(implementation =
String.class), uniqueItems = true)
+ )
+ )
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.PROBLEM_JSON })
+ CompletableFuture<Set<String>> getDataNodesForZone(
+ @PathVariable
+ @Schema(description = "Case-sensitive zone name to return data
nodes for.")
+ String zoneName
+ );
+
+ @Post("{zoneName}/datanodes/reset")
+ @Operation(
+ operationId = "resetDataNodesForZone",
+ description = "Recalculates and resets data nodes for zone."
+ )
+ @ApiResponse(responseCode = "200", description = "Data nodes are
recalculated and reset.")
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Produces(MediaType.PROBLEM_JSON)
+ CompletableFuture<Void> resetDataNodesForZone(
+ @PathVariable
+ @Schema(description = "Case-sensitive zone name to recalculate
data nodes for.")
+ String zoneName
+ );
+
+ @Post("/datanodes/reset")
+ @Operation(
+ operationId = "resetDataNodesForZones",
+ description = "Recalculates and resets data nodes for given zones."
+ )
+ @ApiResponse(responseCode = "200", description = "Data nodes are
recalculated and reset.")
+ @ApiResponse(responseCode = "400", description = "Bad request.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @ApiResponse(responseCode = "500", description = "Internal error.",
+ content = @Content(mediaType = MediaType.PROBLEM_JSON, schema =
@Schema(implementation = Problem.class)))
+ @Produces(MediaType.PROBLEM_JSON)
+ CompletableFuture<Void> resetDataNodesForZones(
+ @Body
+ @Schema(description = "Names specifying zones to recalculate data
nodes for. Case-sensitive, "
+ + "if empty then all zones' data nodes will be
recalculated.")
+ Optional<Set<String>> zoneNames
+ );
+}
diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle
index 544e5d0a2ef..b6084c455f5 100644
--- a/modules/rest/build.gradle
+++ b/modules/rest/build.gradle
@@ -47,6 +47,7 @@ dependencies {
implementation project(':ignite-partition-replicator')
implementation project(':ignite-system-disaster-recovery')
implementation project(':ignite-configuration-storage')
+ implementation project(':ignite-distribution-zones')
implementation libs.jetbrains.annotations
implementation libs.micronaut.inject
implementation libs.micronaut.http.server.netty
@@ -94,6 +95,8 @@ dependencies {
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(":ignite-api"))
integrationTestImplementation testFixtures(project(":ignite-rest-api"))
+ integrationTestImplementation
testFixtures(project(":ignite-distribution-zones"))
+
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.micronaut.junit5
integrationTestImplementation libs.micronaut.test
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/cluster/ItDataNodesControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/cluster/ItDataNodesControllerTest.java
new file mode 100644
index 00000000000..147d111c33f
--- /dev/null
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/cluster/ItDataNodesControllerTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.cluster;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.assertDistributionZoneScaleTimersAreNotScheduled;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.assertScaleDownScheduledOrDone;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.assertScaleUpScheduledOrDone;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.createZoneWithInfiniteTimers;
+import static
org.apache.ignite.internal.distributionzones.DataNodesTestUtil.waitForDataNodes;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.core.type.Argument;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterConfiguration;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.rest.constants.HttpCode;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Test for distributed zones data nodes controller. */
+@MicronautTest
+public class ItDataNodesControllerTest extends ClusterPerTestIntegrationTest {
+ private static final String ZONE_NAME = "test_zone";
+
+ private static final String UNKNOWN_ZONE_NAME = "test_zone_unknown";
+
+ private static final String WRONG_CASE_ZONE_NAME = ZONE_NAME.toUpperCase();
+
+ private static final String DATA_NODES_ENDPOINT = "/datanodes";
+
+ private static final String DATA_NODES_RESET_ENDPOINT =
DATA_NODES_ENDPOINT + "/reset";
+
+ private static final String NODE_URL = "http://localhost:" +
ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;
+
+ @Inject
+ @Client(NODE_URL + "/management/v1/zones")
+ private HttpClient client;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeEach
+ public void setup() {
+ createZoneWithInfiniteTimers(unwrapIgniteImpl(node(0)), ZONE_NAME);
+ }
+
+ @Test
+ public void restDataNodesResetIdempotencyTest() {
+ HttpResponse<String> response =
doRestDataNodesResetForZonesCall(Set.of(ZONE_NAME));
+
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+ }
+
+ @Test
+ public void restDataNodesResetAfterNewNodeAddedTest() {
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
+
+ assertDistributionZoneScaleTimersAreNotScheduled(node0, ZONE_NAME);
+
+ int veryLongTimer = 100_000_000;
+ alterZone(node0.catalogManager(), ZONE_NAME, veryLongTimer,
veryLongTimer, null);
+
+ Ignite node1 = startNode(1);
+ assertLogicalTopologySizeEqualsTo(node0, 2);
+
+ waitForDataNodes(node0, ZONE_NAME, Set.of(node0.name()));
+
+ assertScaleUpScheduledOrDone(node0, ZONE_NAME);
+
+ HttpResponse<String> response =
doRestDataNodesResetForZonesCall(Set.of(ZONE_NAME));
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+
+ waitForDataNodes(node0, ZONE_NAME, Set.of(node0.name(), node1.name()));
+
+ assertDistributionZoneScaleTimersAreNotScheduled(node0, ZONE_NAME);
+
+ stopNode(1);
+
+ assertScaleDownScheduledOrDone(node0, ZONE_NAME);
+
+ response = doRestDataNodesResetForZonesCall(Set.of(ZONE_NAME));
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+
+ waitForDataNodes(node0, ZONE_NAME, Set.of(node0.name()));
+
+ assertDistributionZoneScaleTimersAreNotScheduled(node0, ZONE_NAME);
+ }
+
+ @Test
+ public void restDataNodesResetWithUnknownZonesTest() {
+ Set<String> unknownZoneNames = Set.of(UNKNOWN_ZONE_NAME);
+
+ assertZonesNotFoundExceptionThrown(unknownZoneNames,
this::doRestDataNodesResetForZonesCall);
+ }
+
+ @Test
+ public void restDataNodesResetWithWrongCaseZoneNameTest() {
+ Set<String> unknownZoneNames = Set.of(WRONG_CASE_ZONE_NAME);
+
+ assertZonesNotFoundExceptionThrown(unknownZoneNames,
this::doRestDataNodesResetForZonesCall);
+ }
+
+ @Test
+ public void restDataNodesResetWithEmptyZoneNamesThatTriggersAllZonesTest()
{
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
+
+ String secondZoneName = ZONE_NAME + "_2";
+ createZoneWithInfiniteTimers(node0, secondZoneName);
+
+ Ignite node1 = startNode(1);
+ assertLogicalTopologySizeEqualsTo(node0, 2);
+
+ Set<String> expectedOneDataNode = Set.of(node0.name());
+
+ waitForDataNodes(node0, ZONE_NAME, expectedOneDataNode);
+ waitForDataNodes(node0, secondZoneName, expectedOneDataNode);
+
+ HttpResponse<String> response =
doRestDataNodesResetForZonesCall(Set.of());
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+
+ Set<String> expectedTwoDataNodes = Set.of(node0.name(), node1.name());
+
+ waitForDataNodes(node0, ZONE_NAME, expectedTwoDataNodes);
+ waitForDataNodes(node0, secondZoneName, expectedTwoDataNodes);
+ }
+
+ @Test
+ public void restDataNodesResetForZoneTest() {
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
+
+ Ignite node1 = startNode(1);
+ assertLogicalTopologySizeEqualsTo(node0, 2);
+
+ Set<String> expectedOneDataNode = Set.of(node0.name());
+
+ waitForDataNodes(node0, ZONE_NAME, expectedOneDataNode);
+
+ HttpResponse<String> response =
doRestDataNodesResetForZoneCall(ZONE_NAME);
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+
+ Set<String> expectedTwoDataNodes = Set.of(node0.name(), node1.name());
+
+ waitForDataNodes(node0, ZONE_NAME, expectedTwoDataNodes);
+ }
+
+ @Test
+ public void restDataNodesResetForUnknownZoneTest() {
+ assertZoneNotFoundResponse(UNKNOWN_ZONE_NAME,
this::doRestDataNodesResetForZoneCall);
+ }
+
+ @Test
+ public void restDataNodesResetForWrongCaseZoneNameTest() {
+ assertZoneNotFoundResponse(WRONG_CASE_ZONE_NAME,
this::doRestDataNodesResetForZoneCall);
+ }
+
+ @Test
+ public void restGetDataNodesForZoneTest() {
+ IgniteImpl node0 = unwrapIgniteImpl(node(0));
+
+ Set<String> expectedOneDataNode = Set.of(node0.name());
+
+ waitForDataNodes(node0, ZONE_NAME, expectedOneDataNode);
+
+ HttpResponse<Set<String>> response =
doRestGetDataNodesForZoneCall(ZONE_NAME);
+ assertThat(response.getStatus().getCode(), is(HttpCode.OK.code()));
+ assertThat(response.body(), is(expectedOneDataNode));
+ }
+
+ @Test
+ public void restGetDataNodesForUnknownZoneTest() {
+ assertZoneNotFoundResponse(UNKNOWN_ZONE_NAME,
this::doRestGetDataNodesForZoneCall);
+ }
+
+ @Test
+ public void restGetDataNodesForWrongCaseZoneNameTest() {
+ assertZoneNotFoundResponse(WRONG_CASE_ZONE_NAME,
this::doRestGetDataNodesForZoneCall);
+ }
+
+ private static void assertZoneNotFoundResponse(String zoneName,
Function<String, HttpResponse<?>> httpRequestAction) {
+ HttpClientResponseException ex = assertThrows(
+ HttpClientResponseException.class,
+ () -> httpRequestAction.apply(zoneName)
+ );
+
+ HttpResponse<?> response = ex.getResponse();
+ assertThat(response.code(), is(HttpStatus.BAD_REQUEST.getCode()));
+ assertThat(response.reason(), is(HttpStatus.BAD_REQUEST.getReason()));
+ assertThat(ex.getMessage(), containsString("Distribution zone was not
found [zoneName=" + zoneName + "]"));
+ }
+
+ private static void assertZonesNotFoundExceptionThrown(
+ Set<String> zoneNames,
+ Function<Set<String>, HttpResponse<?>> httpRequestAction
+ ) {
+ HttpClientResponseException ex = assertThrows(
+ HttpClientResponseException.class,
+ () -> httpRequestAction.apply(zoneNames)
+ );
+
+ HttpResponse<?> response = ex.getResponse();
+ assertThat(response.code(), is(HttpStatus.BAD_REQUEST.getCode()));
+ assertThat(response.reason(), is(HttpStatus.BAD_REQUEST.getReason()));
+ assertThat(ex.getMessage(), containsString("Distribution zones were
not found [zoneNames=" + zoneNames + "]"));
+ }
+
+ private static void assertLogicalTopologySizeEqualsTo(IgniteImpl node, int
expectedTopologySize) {
+ LogicalTopologyService logicalTopologyService =
node.logicalTopologyService();
+
+ Awaitility.waitAtMost(1, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(expectedTopologySize,
logicalTopologyService.localLogicalTopology().nodes().size())
+ );
+ }
+
+ private HttpResponse<String> doRestDataNodesResetForZonesCall(Set<String>
zoneNames) {
+ return client
+ .toBlocking()
+ .exchange(HttpRequest.POST(
+ DATA_NODES_RESET_ENDPOINT,
+ zoneNames
+ ));
+ }
+
+ private HttpResponse<String> doRestDataNodesResetForZoneCall(String
zoneName) {
+ return client
+ .toBlocking()
+ .exchange(HttpRequest.POST("/" + zoneName +
DATA_NODES_RESET_ENDPOINT, ""));
+ }
+
+ private HttpResponse<Set<String>> doRestGetDataNodesForZoneCall(String
zoneName) {
+ return client
+ .toBlocking()
+ .exchange(HttpRequest.GET("/" + zoneName +
DATA_NODES_ENDPOINT), Argument.setOf(String.class));
+ }
+}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
index ffd4fd6dc0f..bf58f8e928a 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java
@@ -163,7 +163,7 @@ public class ItDisasterRecoveryControllerTest extends
ClusterPerClassIntegration
assertThrowsProblem(
() -> client.toBlocking().exchange(path +
"?zoneNames=no-such-zone"),
- isProblem().withStatus(BAD_REQUEST).withDetail("Some
distribution zones are missing: [no-such-zone]")
+ isProblem().withStatus(BAD_REQUEST).withDetail("Distribution
zones were not found [zoneNames=[no-such-zone]]")
);
}
@@ -330,7 +330,7 @@ public class ItDisasterRecoveryControllerTest extends
ClusterPerClassIntegration
assertThrowsProblem(
() -> client.toBlocking().exchange(path +
"?zoneNames=no-such-zone", GlobalPartitionStatesResponse.class),
- isProblem().withStatus(BAD_REQUEST).withDetail("Some
distribution zones are missing: [no-such-zone]")
+ isProblem().withStatus(BAD_REQUEST).withDetail("Distribution
zones were not found [zoneNames=[no-such-zone]]")
);
}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/cluster/DataNodesRestFactory.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/cluster/DataNodesRestFactory.java
new file mode 100644
index 00000000000..b8b778a0f6d
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/cluster/DataNodesRestFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.cluster;
+
+import io.micronaut.context.annotation.Bean;
+import io.micronaut.context.annotation.Factory;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.rest.RestFactory;
+import org.apache.ignite.internal.rest.zone.DataNodesController;
+
+/**
+ * Factory that defines beans required for {@link DataNodesController}.
+ */
+@Factory
+public class DataNodesRestFactory implements RestFactory {
+ private DistributionZoneManager distributionZoneManager;
+
+ /** Constructor. */
+ public DataNodesRestFactory(DistributionZoneManager
distributionZoneManager) {
+ this.distributionZoneManager = distributionZoneManager;
+ }
+
+ @Bean
+ @Singleton
+ public DistributionZoneManager getDistributionZoneManager() {
+ return distributionZoneManager;
+ }
+
+ @Override
+ public void cleanResources() {
+ distributionZoneManager = null;
+ }
+}
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/zone/DataNodesController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/zone/DataNodesController.java
new file mode 100644
index 00000000000..86fe7b91f79
--- /dev/null
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/zone/DataNodesController.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.zone;
+
+import io.micronaut.http.annotation.Controller;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.rest.ResourceHolder;
+import org.apache.ignite.internal.rest.api.zone.DataNodesApi;
+
+/**
+ * Distributed zones data nodes controller.
+ */
+@Controller("/management/v1/zones")
+public class DataNodesController implements DataNodesApi, ResourceHolder {
+ private DistributionZoneManager distributionZoneManager;
+
+ public DataNodesController(DistributionZoneManager
distributionZoneManager) {
+ this.distributionZoneManager = distributionZoneManager;
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getDataNodesForZone(String zoneName)
{
+ return distributionZoneManager.currentDataNodes(zoneName);
+ }
+
+ @Override
+ public CompletableFuture<Void> resetDataNodesForZone(String zoneName) {
+ return distributionZoneManager.recalculateDataNodes(zoneName);
+ }
+
+ @Override
+ public CompletableFuture<Void>
resetDataNodesForZones(Optional<Set<String>> zoneNames) {
+ return
distributionZoneManager.recalculateDataNodes(zoneNames.orElse(Set.of()));
+ }
+
+ @Override
+ public void cleanResources() {
+ distributionZoneManager = null;
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 7806a7f985c..26b42810b71 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -223,6 +223,7 @@ import org.apache.ignite.internal.rest.RestManager;
import org.apache.ignite.internal.rest.RestManagerFactory;
import
org.apache.ignite.internal.rest.authentication.AuthenticationProviderFactory;
import org.apache.ignite.internal.rest.cluster.ClusterManagementRestFactory;
+import org.apache.ignite.internal.rest.cluster.DataNodesRestFactory;
import org.apache.ignite.internal.rest.compute.ComputeRestFactory;
import org.apache.ignite.internal.rest.configuration.PresentationsFactory;
import org.apache.ignite.internal.rest.configuration.RestConfiguration;
@@ -1423,6 +1424,7 @@ public class IgniteImpl implements Ignite {
Supplier<RestFactory> systemDisasterRecoveryFactory = () -> new
SystemDisasterRecoveryFactory(systemDisasterRecoveryManager);
Supplier<RestFactory> sqlQueryRestFactory = () -> new
SqlQueryRestFactory(sql, killCommandHandler);
Supplier<RestFactory> nodePropertiesRestFactory = () -> new
NodePropertiesFactory(nodeProperties);
+ Supplier<RestFactory> dataNodesRestFactory = () -> new
DataNodesRestFactory(distributionZoneManager);
RestConfiguration restConfiguration =
nodeCfgMgr.configurationRegistry().getConfiguration(RestExtensionConfiguration.KEY).rest();
@@ -1438,7 +1440,8 @@ public class IgniteImpl implements Ignite {
disasterRecoveryFactory,
systemDisasterRecoveryFactory,
sqlQueryRestFactory,
- nodePropertiesRestFactory
+ nodePropertiesRestFactory,
+ dataNodesRestFactory
),
restManager,
restConfiguration
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 9921f0b2ac0..867dde42b8c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -22,7 +22,6 @@ import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.TABLE_CREATE;
@@ -74,6 +73,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
import
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
@@ -132,7 +132,6 @@ import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalN
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
-import
org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.versioned.VersionedSerialization;
@@ -747,7 +746,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Function<LocalPartitionStateMessage, T> keyExtractor
) {
return inBusyLock(busyLock, () -> {
- Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames,
catalog.zones());
+ Collection<CatalogZoneDescriptor> zones =
DistributionZonesUtil.filterZonesForOperations(zoneNames, catalog.zones());
checkPartitionsRange(partitionIds, zones);
@@ -1069,29 +1068,6 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return nodes;
}
- private static Collection<CatalogZoneDescriptor> filterZones(Set<String>
zoneNames, Collection<CatalogZoneDescriptor> zones)
- throws ZonesNotFoundException {
- if (zoneNames.isEmpty()) {
- return zones;
- }
-
- List<CatalogZoneDescriptor> zoneDescriptors = zones.stream()
- .filter(catalogZoneDescriptor ->
zoneNames.contains(catalogZoneDescriptor.name()))
- .collect(toList());
-
- Set<String> foundZoneNames = zoneDescriptors.stream()
- .map(CatalogObjectDescriptor::name)
- .collect(toSet());
-
- if (!zoneNames.equals(foundZoneNames)) {
- Set<String> missingZoneNames =
CollectionUtils.difference(zoneNames, foundZoneNames);
-
- throw new ZonesNotFoundException(missingZoneNames);
- }
-
- return zoneDescriptors;
- }
-
/**
* Short version of {@link
DisasterRecoveryManager#processNewRequest(DisasterRecoveryRequest, long)}
without revision.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
deleted file mode 100644
index aadfe42e87a..00000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/ZonesNotFoundException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-
-import static
org.apache.ignite.lang.ErrorGroups.DistributionZones.ZONE_NOT_FOUND_ERR;
-
-import java.util.Set;
-
-/** Exception is thrown when appropriate zones can`t be found. */
-public class ZonesNotFoundException extends DisasterRecoveryException {
- private static final long serialVersionUID = -8475588176132321568L;
-
- public ZonesNotFoundException(Set<String> missingZoneNames) {
- super(ZONE_NOT_FOUND_ERR, "Some distribution zones are missing: " +
missingZoneNames);
- }
-}