This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 30ec21459a IGNITE-18088 Trigger rebalance on zone.dataNodes change.
(#1572)
30ec21459a is described below
commit 30ec21459ad179646060ed6ff32f4aa346c8a8aa
Author: Sergey Uttsel <[email protected]>
AuthorDate: Fri Feb 3 14:51:37 2023 +0300
IGNITE-18088 Trigger rebalance on zone.dataNodes change. (#1572)
---
.../ignite/internal/affinity/AffinityUtils.java | 26 +-
.../affinity/RendezvousAffinityFunction.java | 109 +++--
.../internal/affinity/AffinityServiceTest.java | 14 +-
.../affinity/RendezvousAffinityFunctionTest.java | 37 +-
.../distributionzones/DistributionZonesUtil.java | 33 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 246 +++++++++++
modules/table/build.gradle | 3 +
.../distributed/ItTxDistributedTestSingleNode.java | 2 +-
.../internal/table/distributed/TableManager.java | 148 ++++++-
.../ignite/internal/utils/RebalanceUtil.java | 13 +-
.../TableManagerDistributionZonesTest.java | 462 +++++++++++++++++++++
.../table/distributed/TableManagerTest.java | 8 +-
12 files changed, 965 insertions(+), 136 deletions(-)
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index ec15cedc02..cf64ba7d6e 100644
---
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.ignite.network.ClusterNode;
/**
* Stateless affinity utils that produces helper methods for an affinity
assignments calculation.
@@ -34,13 +33,14 @@ public class AffinityUtils {
/**
* Calculates affinity assignments.
*
+ * @param dataNodes Data nodes.
* @param partitions Partitions count.
* @param replicas Replicas count.
* @return List assignments by partition.
*/
- public static List<Set<Assignment>>
calculateAssignments(Collection<ClusterNode> baselineNodes, int partitions, int
replicas) {
- List<Set<ClusterNode>> affinityNodes =
RendezvousAffinityFunction.assignPartitions(
- baselineNodes,
+ public static List<Set<Assignment>>
calculateAssignments(Collection<String> dataNodes, int partitions, int
replicas) {
+ List<Set<String>> affinityNodes =
RendezvousAffinityFunction.assignPartitions(
+ dataNodes,
partitions,
replicas,
false,
@@ -48,21 +48,21 @@ public class AffinityUtils {
HashSet::new
);
- return
affinityNodes.stream().map(AffinityUtils::clusterNodesToAssignments).collect(toList());
+ return
affinityNodes.stream().map(AffinityUtils::dataNodesToAssignments).collect(toList());
}
/**
* Calculates affinity assignments for a single partition.
*
- * @param baselineNodes Nodes.
+ * @param dataNodes Data nodes.
* @param partition Partition id.
* @param replicas Replicas count.
- * @return List of assignments.
+ * @return Set of assignments.
*/
- public static Set<Assignment>
calculateAssignmentForPartition(Collection<ClusterNode> baselineNodes, int
partition, int replicas) {
- Set<ClusterNode> affinityNodes =
RendezvousAffinityFunction.assignPartition(
+ public static Set<Assignment>
calculateAssignmentForPartition(Collection<String> dataNodes, int partition,
int replicas) {
+ Set<String> affinityNodes = RendezvousAffinityFunction.assignPartition(
partition,
- new ArrayList<>(baselineNodes),
+ new ArrayList<>(dataNodes),
replicas,
null,
false,
@@ -70,10 +70,10 @@ public class AffinityUtils {
HashSet::new
);
- return clusterNodesToAssignments(affinityNodes);
+ return dataNodesToAssignments(affinityNodes);
}
- private static Set<Assignment>
clusterNodesToAssignments(Collection<ClusterNode> nodes) {
- return nodes.stream().map(node ->
Assignment.forPeer(node.name())).collect(toSet());
+ private static Set<Assignment> dataNodesToAssignments(Collection<String>
nodes) {
+ return nodes.stream().map(Assignment::forPeer).collect(toSet());
}
}
diff --git
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index 91adae62fd..0cc1adf545 100644
---
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -33,7 +33,6 @@ import java.util.function.IntFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.network.ClusterNode;
/**
* Affinity function for partitioned table based on Highest Random Weight
algorithm. This function supports the following configuration:
@@ -59,7 +58,7 @@ public class RendezvousAffinityFunction {
private static final IgniteLogger LOG =
Loggers.forClass(RendezvousAffinityFunction.class);
/** Comparator. */
- private static final Comparator<IgniteBiTuple<Long, ClusterNode>>
COMPARATOR = new HashComparator();
+ private static final Comparator<IgniteBiTuple<Long, String>> COMPARATOR =
new HashComparator();
/** Maximum number of partitions. */
public static final int MAX_PARTITIONS_COUNT = 65000;
@@ -67,16 +66,6 @@ public class RendezvousAffinityFunction {
/** Exclude neighbors warning. */
private static boolean exclNeighborsWarn;
- /**
- * Resolves node hash.
- *
- * @param node Cluster node;
- * @return Node hash.
- */
- public static Object resolveNodeHash(ClusterNode node) {
- return node.name();
- }
-
/**
* Returns collection of nodes for specified partition.
*
@@ -89,13 +78,13 @@ public class RendezvousAffinityFunction {
* @param aggregator Function that creates a collection for the
partition assignments.
* @return Assignment.
*/
- public static <T extends Collection<ClusterNode>> T assignPartition(
+ public static <T extends Collection<String>> T assignPartition(
int part,
- List<ClusterNode> nodes,
+ List<String> nodes,
int replicas,
- Map<String, Collection<ClusterNode>> neighborhoodCache,
+ Map<String, Collection<String>> neighborhoodCache,
boolean exclNeighbors,
- BiPredicate<ClusterNode, T> nodeFilter,
+ BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
if (nodes.size() <= 1) {
@@ -106,58 +95,56 @@ public class RendezvousAffinityFunction {
return res;
}
- IgniteBiTuple<Long, ClusterNode>[] hashArr =
- (IgniteBiTuple<Long, ClusterNode>[]) new
IgniteBiTuple[nodes.size()];
+ IgniteBiTuple<Long, String>[] hashArr =
+ (IgniteBiTuple<Long, String>[]) new
IgniteBiTuple[nodes.size()];
for (int i = 0; i < nodes.size(); i++) {
- ClusterNode node = nodes.get(i);
-
- Object nodeHash = resolveNodeHash(node);
+ String node = nodes.get(i);
- long hash = hash(nodeHash.hashCode(), part);
+ long hash = hash(node.hashCode(), part);
hashArr[i] = new IgniteBiTuple<>(hash, node);
}
final int effectiveReplicas = replicas == Integer.MAX_VALUE ?
nodes.size() : Math.min(replicas, nodes.size());
- Iterable<ClusterNode> sortedNodes = new
LazyLinearSortedContainer(hashArr, effectiveReplicas);
+ Iterable<String> sortedNodes = new LazyLinearSortedContainer(hashArr,
effectiveReplicas);
// REPLICATED cache case
if (replicas == Integer.MAX_VALUE) {
return replicatedAssign(nodes, sortedNodes, aggregator);
}
- Iterator<ClusterNode> it = sortedNodes.iterator();
+ Iterator<String> it = sortedNodes.iterator();
T res = aggregator.apply(effectiveReplicas);
- Collection<ClusterNode> allNeighbors = new HashSet<>();
+ Collection<String> allNeighbors = new HashSet<>();
- ClusterNode first = it.next();
+ String first = it.next();
res.add(first);
if (exclNeighbors) {
- allNeighbors.addAll(neighborhoodCache.get(first.id()));
+ allNeighbors.addAll(neighborhoodCache.get(first));
}
// Select another replicas.
if (replicas > 1) {
while (it.hasNext() && res.size() < effectiveReplicas) {
- ClusterNode node = it.next();
+ String node = it.next();
if (exclNeighbors) {
if (!allNeighbors.contains(node)) {
res.add(node);
- allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ allNeighbors.addAll(neighborhoodCache.get(node));
}
} else if (nodeFilter == null || nodeFilter.test(node, res)) {
res.add(node);
if (exclNeighbors) {
- allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ allNeighbors.addAll(neighborhoodCache.get(node));
}
}
}
@@ -170,7 +157,7 @@ public class RendezvousAffinityFunction {
it.next();
while (it.hasNext() && res.size() < effectiveReplicas) {
- ClusterNode node = it.next();
+ String node = it.next();
if (!res.contains(node)) {
res.add(node);
@@ -198,15 +185,15 @@ public class RendezvousAffinityFunction {
* @param aggregator Function that creates a collection for the partition
assignments.
* @return Assignment.
*/
- private static <T extends Collection<ClusterNode>> T
replicatedAssign(List<ClusterNode> nodes,
- Iterable<ClusterNode> sortedNodes, IntFunction<T> aggregator) {
- ClusterNode first = sortedNodes.iterator().next();
+ private static <T extends Collection<String>> T
replicatedAssign(List<String> nodes,
+ Iterable<String> sortedNodes, IntFunction<T> aggregator) {
+ String first = sortedNodes.iterator().next();
T res = aggregator.apply(nodes.size());
res.add(first);
- for (ClusterNode n : nodes) {
+ for (String n : nodes) {
if (!n.equals(first)) {
res.add(n);
}
@@ -250,12 +237,12 @@ public class RendezvousAffinityFunction {
* @param nodeFilter Filter for nodes.
* @return List nodes by partition.
*/
- public static List<List<ClusterNode>> assignPartitions(
- Collection<ClusterNode> currentTopologySnapshot,
+ public static List<List<String>> assignPartitions(
+ Collection<String> currentTopologySnapshot,
int partitions,
int replicas,
boolean exclNeighbors,
- BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+ BiPredicate<String, List<String>> nodeFilter
) {
return assignPartitions(currentTopologySnapshot, partitions, replicas,
exclNeighbors, nodeFilter, ArrayList::new);
}
@@ -271,12 +258,12 @@ public class RendezvousAffinityFunction {
* @param aggregator Function that creates a collection for
the partition assignments.
* @return List nodes by partition.
*/
- public static <T extends Collection<ClusterNode>> List<T> assignPartitions(
- Collection<ClusterNode> currentTopologySnapshot,
+ public static <T extends Collection<String>> List<T> assignPartitions(
+ Collection<String> currentTopologySnapshot,
int partitions,
int replicas,
boolean exclNeighbors,
- BiPredicate<ClusterNode, T> nodeFilter,
+ BiPredicate<String, T> nodeFilter,
IntFunction<T> aggregator
) {
assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " +
MAX_PARTITIONS_COUNT;
@@ -285,9 +272,9 @@ public class RendezvousAffinityFunction {
List<T> assignments = new ArrayList<>(partitions);
- Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors
? neighbors(currentTopologySnapshot) : null;
+ Map<String, Collection<String>> neighborhoodCache = exclNeighbors ?
neighbors(currentTopologySnapshot) : null;
- List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
+ List<String> nodes = new ArrayList<>(currentTopologySnapshot);
for (int i = 0; i < partitions; i++) {
T partAssignment = assignPartition(i, nodes, replicas,
neighborhoodCache, exclNeighbors, nodeFilter, aggregator);
@@ -304,15 +291,15 @@ public class RendezvousAffinityFunction {
* @param topSnapshot Topology snapshot.
* @return Neighbors map.
*/
- public static Map<String, Collection<ClusterNode>>
neighbors(Collection<ClusterNode> topSnapshot) {
- Map<String, Collection<ClusterNode>> macMap = new
HashMap<>(topSnapshot.size(), 1.0f);
+ public static Map<String, Collection<String>> neighbors(Collection<String>
topSnapshot) {
+ Map<String, Collection<String>> macMap = new
HashMap<>(topSnapshot.size(), 1.0f);
// Group by mac addresses.
- for (ClusterNode node : topSnapshot) {
+ for (String node : topSnapshot) {
String macs = String.valueOf(node.hashCode());
//node.attribute(IgniteNodeAttributes.ATTR_MACS);
- Collection<ClusterNode> nodes = macMap.get(macs);
+ Collection<String> nodes = macMap.get(macs);
if (nodes == null) {
macMap.put(macs, nodes = new HashSet<>());
@@ -321,11 +308,11 @@ public class RendezvousAffinityFunction {
nodes.add(node);
}
- Map<String, Collection<ClusterNode>> neighbors = new
HashMap<>(topSnapshot.size(), 1.0f);
+ Map<String, Collection<String>> neighbors = new
HashMap<>(topSnapshot.size(), 1.0f);
- for (Collection<ClusterNode> group : macMap.values()) {
- for (ClusterNode node : group) {
- neighbors.put(node.id(), group);
+ for (Collection<String> group : macMap.values()) {
+ for (String node : group) {
+ neighbors.put(node, group);
}
}
@@ -335,24 +322,24 @@ public class RendezvousAffinityFunction {
/**
* Hash comparator.
*/
- private static class HashComparator implements
Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+ private static class HashComparator implements
Comparator<IgniteBiTuple<Long, String>>, Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override
- public int compare(IgniteBiTuple<Long, ClusterNode> o1,
IgniteBiTuple<Long, ClusterNode> o2) {
+ public int compare(IgniteBiTuple<Long, String> o1, IgniteBiTuple<Long,
String> o2) {
return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
- o1.get2().name().compareTo(o2.get2().name());
+ o1.get2().compareTo(o2.get2());
}
}
/**
* Sorts the initial array with linear sort algorithm array.
*/
- private static class LazyLinearSortedContainer implements
Iterable<ClusterNode> {
+ private static class LazyLinearSortedContainer implements Iterable<String>
{
/** Initial node-hash array. */
- private final IgniteBiTuple<Long, ClusterNode>[] arr;
+ private final IgniteBiTuple<Long, String>[] arr;
/** Count of the sorted elements. */
private int sorted;
@@ -363,7 +350,7 @@ public class RendezvousAffinityFunction {
* @param arr Node / partition hash list.
* @param needFirstSortedCnt Estimate count of elements to return by
iterator.
*/
- LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int
needFirstSortedCnt) {
+ LazyLinearSortedContainer(IgniteBiTuple<Long, String>[] arr, int
needFirstSortedCnt) {
this.arr = arr;
if (needFirstSortedCnt > (int) Math.log(arr.length)) {
@@ -375,14 +362,14 @@ public class RendezvousAffinityFunction {
/** {@inheritDoc} */
@Override
- public Iterator<ClusterNode> iterator() {
+ public Iterator<String> iterator() {
return new SortIterator();
}
/**
* Sorting iterator.
*/
- private class SortIterator implements Iterator<ClusterNode> {
+ private class SortIterator implements Iterator<String> {
/** Index of the first unsorted element. */
private int cur;
@@ -394,7 +381,7 @@ public class RendezvousAffinityFunction {
/** {@inheritDoc} */
@Override
- public ClusterNode next() {
+ public String next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
@@ -403,7 +390,7 @@ public class RendezvousAffinityFunction {
return arr[cur++].get2();
}
- IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+ IgniteBiTuple<Long, String> min = arr[cur];
int minIdx = cur;
diff --git
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
index 5310ab1dcc..0d0012d4bf 100644
---
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
+++
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityServiceTest.java
@@ -23,9 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
/**
@@ -36,16 +33,7 @@ public class AffinityServiceTest {
@Test
public void testCalculatedAssignmentHappyPath() {
List<Set<Assignment>> assignments = AffinityUtils.calculateAssignments(
- Arrays.asList(
- new ClusterNode(
- UUID.randomUUID().toString(), "node0",
- new NetworkAddress("localhost", 8080)
- ),
- new ClusterNode(
- UUID.randomUUID().toString(), "node1",
- new NetworkAddress("localhost", 8081)
- )
- ),
+ Arrays.asList("node0", "node1"),
10,
3
);
diff --git
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 6866d9a0fb..187bfecd89 100644
---
a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++
b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -35,7 +34,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
@@ -51,32 +49,32 @@ public class RendezvousAffinityFunctionTest {
@Test
public void testPartitionDistribution() {
- int nodes = 50;
+ int nodeCount = 50;
int parts = 10_000;
int replicas = 4;
- List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+ List<String> nodes = prepareNetworkTopology(nodeCount);
- assertTrue(parts > nodes, "Partitions should be more that nodes");
+ assertTrue(parts > nodeCount, "Partitions should be more than nodes");
- int ideal = (parts * replicas) / nodes;
+ int ideal = (parts * replicas) / nodeCount;
- List<List<ClusterNode>> assignment =
RendezvousAffinityFunction.assignPartitions(
- clusterNodes,
+ List<List<String>> assignment =
RendezvousAffinityFunction.assignPartitions(
+ nodes,
parts,
replicas,
false,
null
);
- HashMap<ClusterNode, ArrayList<Integer>> assignmentByNode = new
HashMap<>(nodes);
+ HashMap<String, ArrayList<Integer>> assignmentByNode = new
HashMap<>(nodeCount);
int part = 0;
- for (List<ClusterNode> partNodes : assignment) {
- for (ClusterNode node : partNodes) {
+ for (List<String> partNodes : assignment) {
+ for (String node : partNodes) {
ArrayList<Integer> nodeParts = assignmentByNode.get(node);
if (nodeParts == null) {
@@ -89,7 +87,7 @@ public class RendezvousAffinityFunctionTest {
part++;
}
- for (ClusterNode node : clusterNodes) {
+ for (String node : nodes) {
ArrayList<Integer> nodeParts = assignmentByNode.get(node);
assertNotNull(nodeParts);
@@ -104,29 +102,26 @@ public class RendezvousAffinityFunctionTest {
}
@NotNull
- private List<ClusterNode> prepareNetworkTopology(int nodes) {
- var addr = new NetworkAddress("127.0.0.1", 121212);
-
+ private List<String> prepareNetworkTopology(int nodes) {
return IntStream.range(0, nodes)
.mapToObj(i -> "Node " + i)
- .map(name -> new ClusterNode(UUID.randomUUID().toString(),
name, addr))
.collect(Collectors.toUnmodifiableList());
}
@Test
public void serializeAssignment() {
- int nodes = 50;
+ int nodeCount = 50;
int parts = 10_000;
int replicas = 4;
- List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+ List<String> nodes = prepareNetworkTopology(nodeCount);
- assertTrue(parts > nodes, "Partitions should be more that nodes");
+ assertTrue(parts > nodeCount, "Partitions should be more than nodes");
- List<List<ClusterNode>> assignment =
RendezvousAffinityFunction.assignPartitions(
- clusterNodes,
+ List<List<String>> assignment =
RendezvousAffinityFunction.assignPartitions(
+ nodes,
parts,
replicas,
false,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index ea1d8e6177..ac9cb9905d 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import java.nio.charset.StandardCharsets;
import java.util.Set;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
@@ -34,7 +35,7 @@ import org.apache.ignite.lang.ByteArray;
/**
* Util class for Distribution Zones flow.
*/
-class DistributionZonesUtil {
+public class DistributionZonesUtil {
/** Key prefix for zone's data nodes. */
private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX =
"distributionZone.dataNodes.";
@@ -60,11 +61,37 @@ class DistributionZonesUtil {
private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
- /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
- static ByteArray zoneDataNodesKey(int zoneId) {
+ /**
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+ *
+ * @param zoneId Zone id.
+ * @return ByteArray representation.
+ */
+ public static ByteArray zoneDataNodesKey(int zoneId) {
return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX + zoneId);
}
+ /**
+ * ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}.
+ *
+ * @return ByteArray representation.
+ */
+ public static ByteArray zoneDataNodesPrefix() {
+ return new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);
+ }
+
+ /**
+ * Extract zone id from a distribution zone data nodes key.
+ *
+ * @param key Key.
+ * @return Zone id.
+ */
+ public static int extractZoneId(byte[] key) {
+ var strKey = new String(key, StandardCharsets.UTF_8);
+
+ return
Integer.parseInt(strKey.substring(DISTRIBUTION_ZONE_DATA_NODES_PREFIX.length()));
+ }
+
/**
* The key needed for processing an event about zone's creation and
deletion.
* With this key we can be sure that event was triggered only once.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
new file mode 100644
index 0000000000..665cac7338
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rebalance;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for the rebalance.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItRebalanceTest extends BaseIgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(ItRebalanceTest.class);
+
+ @WorkDirectory
+ private Path workDir;
+
+ private Cluster cluster;
+
+ @BeforeEach
+ void createCluster(TestInfo testInfo) {
+ cluster = new Cluster(testInfo, workDir);
+ }
+
+ @AfterEach
+ void shutdownCluster() {
+ cluster.shutdown();
+ }
+
+ /**
+ * The test checks that data is rebalanced after node with replica is left
and joined to the cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18692")
+ void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+ cluster.startAndInit(4);
+
+ //Creates table with 1 partition and 3 replicas.
+ createTestTable();
+
+ assertTrue(waitAssignments(List.of(
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2)
+ )));
+
+ TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+ BinaryRowEx row = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id",
1).set("value", "value1"));
+ BinaryRowEx key = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(0).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(1).node()).get());
+ assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(2).node()).get());
+
+ table.internalTable().insert(row, null).get();
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+ try {
+ table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+
+ fail();
+ } catch (Exception e) {
+ assertInstanceOf(ExecutionException.class, e);
+
+ assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+ }
+
+ cluster.knockOutNode(2, PARTITION_NETWORK);
+
+ assertTrue(waitAssignments(List.of(
+ Set.of(0, 1, 3),
+ Set.of(0, 1, 3),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 3)
+ )));
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(3).node()).get());
+
+ cluster.reanimateNode(2, PARTITION_NETWORK);
+
+ assertTrue(waitAssignments(List.of(
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2),
+ Set.of(0, 1, 2)
+ )));
+
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
+ assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+
+ try {
+ table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+
+ fail();
+ } catch (Exception e) {
+ assertInstanceOf(ExecutionException.class, e);
+
+ assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+ }
+ }
+
+ /**
+ * Wait assignments on nodes.
+ *
+ * @param nodes Expected assignments.
+ * @return {@code true} if the expected and actual assignments are the
same.
+ * @throws InterruptedException If interrupted.
+ */
+ private boolean waitAssignments(List<Set<Integer>> nodes) throws
InterruptedException {
+ return waitForCondition(() -> {
+ for (int i = 0; i < nodes.size(); i++) {
+ Set<Integer> expectedAssignments = nodes.get(i);
+
+ ExtendedTableConfiguration table =
+ (ExtendedTableConfiguration) cluster.node(i)
+
.clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST");
+
+ byte[] assignmentsBytes = table.assignments().value();
+
+ Set<String> assignments;
+
+ if (assignmentsBytes != null) {
+ assignments = ((List<Set<Assignment>>)
ByteUtils.fromBytes(assignmentsBytes)).get(0)
+ .stream().map(assignment ->
assignment.consistentId()).collect(Collectors.toSet());
+ } else {
+ assignments = Collections.emptySet();
+ }
+
+ LOG.info("Assignments for node " + i + ": " + assignments);
+
+ if (!(expectedAssignments.size() == assignments.size())
+ || !expectedAssignments.stream().allMatch(node ->
assignments.contains(cluster.node(node).name()))) {
+ return false;
+ }
+ }
+
+ return true;
+ },
+ 5000);
+ }
+
+ private void createTestTable() throws InterruptedException {
+ String sql1 = "create zone test_zone with "
+ + "data_nodes_auto_adjust_scale_up=0, "
+ + "data_nodes_auto_adjust_scale_down=0";
+ String sql2 = "create table test (id int primary key, value
varchar(20))"
+ + " with partitions=1, replicas=3, primary_zone='TEST_ZONE'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(sql1, session);
+ executeUpdate(sql2, session);
+ });
+
+ waitForTableToStart();
+ }
+
+ private void waitForTableToStart() throws InterruptedException {
+ // TODO: IGNITE-18203 - remove this wait because when a table creation
query is executed, the table must be fully ready.
+
+ BooleanSupplier tableStarted = () -> {
+ int numberOfStartedRaftNodes = cluster.runningNodes()
+ .map(ItRebalanceTest::tablePartitionIds)
+ .mapToInt(List::size)
+ .sum();
+ return numberOfStartedRaftNodes == 3;
+ };
+
+ assertTrue(waitForCondition(tableStarted, 10_000), "Did not see all
table RAFT nodes started");
+ }
+
+ /**
+ * Returns the IDs of all table partitions that exist on the given node.
+ */
+ private static List<TablePartitionId> tablePartitionIds(IgniteImpl node) {
+ return node.raftManager().localNodes().stream()
+ .map(RaftNodeId::groupId)
+ .filter(TablePartitionId.class::isInstance)
+ .map(TablePartitionId.class::cast)
+ .collect(toList());
+ }
+}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index e2d59f5bc3..54a04a315e 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -59,6 +59,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-transactions')))
testImplementation(testFixtures(project(':ignite-storage-api')))
+ testImplementation(testFixtures(project(':ignite-metastorage')))
testImplementation libs.mockito.core
testImplementation libs.mockito.inline
testImplementation libs.mockito.junit
@@ -80,6 +81,7 @@ dependencies {
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation(testFixtures(project(':ignite-storage-api')))
testFixturesImplementation(testFixtures(project(':ignite-transactions')))
+
testFixturesImplementation(testFixtures(project(':ignite-cluster-management')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.fastutil.core
testFixturesImplementation libs.mockito.core
@@ -96,6 +98,7 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-raft')))
integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
+
integrationTestImplementation(testFixtures(project(':ignite-cluster-management')))
integrationTestImplementation libs.fastutil.core
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 80e0f957ca..7a368f6021 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -371,7 +371,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
*/
private Int2ObjectOpenHashMap<RaftGroupService> startTable(UUID tblId,
SchemaDescriptor schemaDescriptor) throws Exception {
List<Set<Assignment>> calculatedAssignments =
AffinityUtils.calculateAssignments(
- cluster.stream().map(node ->
node.topologyService().localMember()).collect(toList()),
+ cluster.stream().map(node ->
node.topologyService().localMember().name()).collect(toList()),
1,
replicas()
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d1a483094f..6bfa418150 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -24,6 +24,8 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesPrefix;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -70,6 +72,7 @@ import java.util.function.IntSupplier;
import java.util.stream.Stream;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationProperty;
+import org.apache.ignite.configuration.NamedConfigurationTree;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -293,6 +296,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
+ /** Meta storage listener for changes in the distribution zones data
nodes. */
+ private final WatchListener distributionZonesDataNodesListener;
+
+ /** Meta storage listener for pending assignments. */
+ private final WatchListener pendingAssignmentsRebalanceListener;
+
+ /** Meta storage listener for stable assignments. */
+ private final WatchListener stableAssignmentsRebalanceListener;
+
+ /** Meta storage listener for switch reduce assignments. */
+ private final WatchListener assignmentsSwitchRebalanceListener;
+
/**
* Creates a new table manager.
*
@@ -427,6 +442,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(nodeName, "incoming-raft-snapshot",
LOG)
);
+
+ distributionZonesDataNodesListener =
createDistributionZonesDataNodesListener();
+
+ pendingAssignmentsRebalanceListener =
createPendingAssignmentsRebalanceListener();
+
+ stableAssignmentsRebalanceListener =
createStableAssignmentsRebalanceListener();
+
+ assignmentsSwitchRebalanceListener =
createAssignmentsSwitchRebalanceListener();
}
/** {@inheritDoc} */
@@ -434,7 +457,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
public void start() {
tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas);
- registerRebalanceListeners();
+ // TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
+ metaStorageMgr.registerPrefixWatch(zoneDataNodesPrefix(),
distributionZonesDataNodesListener);
+
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
((ExtendedTableConfiguration)
tablesCfg.tables().any()).assignments().listen(this::onUpdateAssignments);
@@ -606,7 +634,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
for (int i = 0; i < partCnt; i++) {
TablePartitionId replicaGrpId = new
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
- futures[i] =
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId,
baselineMgr.nodes(), newReplicas,
+ futures[i] =
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId,
+
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
newReplicas,
replicasCtx.storageRevision(), metaStorageMgr, i);
}
@@ -953,6 +982,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return;
}
+ metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener);
+
+ metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
+ metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
+ metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
+
busyLock.block();
Map<UUID, TableImpl> tables = tablesByIdVv.latest();
@@ -1248,7 +1283,11 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
private Set<Assignment> calculateAssignments(TableConfiguration tableCfg,
int partNum) {
- return
AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum,
tableCfg.value().replicas());
+ return AffinityUtils.calculateAssignmentForPartition(
+
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
+ partNum,
+ tableCfg.value().replicas()
+ );
}
/**
@@ -1308,7 +1347,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// Affinity assignments calculation.
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
- baselineMgr.nodes(),
+
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
tableChange.partitions(),
tableChange.replicas())));
});
@@ -1789,10 +1828,79 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Register the new meta storage listener for changes in the
rebalance-specific keys.
+ * Creates meta storage listener for distribution zones data nodes updates.
+ *
+ * @return The watch listener.
+ */
+ private WatchListener createDistributionZonesDataNodesListener() {
+ return new WatchListener() {
+ @Override
+ public void onUpdate(WatchEvent evt) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new
NodeStoppingException());
+ }
+
+ try {
+ byte[] dataNodesBytes =
evt.entryEvent().newEntry().value();
+
+ if (dataNodesBytes == null) {
+ //The zone was removed so data nodes was removed too.
+ return;
+ }
+
+ NamedConfigurationTree<TableConfiguration, TableView,
TableChange> tables = tablesCfg.tables();
+
+ int zoneId =
extractZoneId(evt.entryEvent().newEntry().key());
+
+ Set<String> dataNodes =
ByteUtils.fromBytes(dataNodesBytes);
+
+ for (int i = 0; i < tables.value().size(); i++) {
+ TableView tableView = tables.value().get(i);
+
+ int tableZoneId = tableView.zoneId();
+
+ if (zoneId == tableZoneId) {
+ TableConfiguration tableCfg =
tables.get(tableView.name());
+
+ for (int part = 0; part < tableView.partitions();
part++) {
+ UUID tableId = ((ExtendedTableConfiguration)
tableCfg).id().value();
+
+ TablePartitionId replicaGrpId = new
TablePartitionId(tableId, part);
+
+ int partId = part;
+
+ updatePendingAssignmentsKeys(
+ tableView.name(), replicaGrpId,
dataNodes, tableView.replicas(),
+
evt.entryEvent().newEntry().revision(), metaStorageMgr, part
+ ).exceptionally(e -> {
+ LOG.error(
+ "Exception on updating assignments
for [table={}, partition={}]", e, tableView.name(), partId
+ );
+
+ return null;
+ });
+ }
+ }
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Unable to process data nodes event", e);
+ }
+ };
+ }
+
+ /**
+ * Creates meta storage listener for pending assignments updates.
+ *
+ * @return The watch listener.
*/
- private void registerRebalanceListeners() {
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
new WatchListener() {
+ private WatchListener createPendingAssignmentsRebalanceListener() {
+ return new WatchListener() {
@Override
public void onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {
@@ -1960,9 +2068,16 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
public void onError(Throwable e) {
LOG.warn("Unable to process pending assignments event", e);
}
- });
+ };
+ }
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
new WatchListener() {
+ /**
+ * Creates meta storage listener for stable assignments updates.
+ *
+ * @return The watch listener.
+ */
+ private WatchListener createStableAssignmentsRebalanceListener() {
+ return new WatchListener() {
@Override
public void onUpdate(WatchEvent evt) {
handleChangeStableAssignmentEvent(evt);
@@ -1972,9 +2087,16 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
public void onError(Throwable e) {
LOG.warn("Unable to process stable assignments event", e);
}
- });
+ };
+ }
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
new WatchListener() {
+ /**
+ * Creates meta storage listener for switch reduce assignments updates.
+ *
+ * @return The watch listener.
+ */
+ private WatchListener createAssignmentsSwitchRebalanceListener() {
+ return new WatchListener() {
@Override
public void onUpdate(WatchEvent evt) {
byte[] key = evt.entryEvent().newEntry().key();
@@ -1990,7 +2112,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
RebalanceUtil.handleReduceChanged(
metaStorageMgr,
- baselineMgr.nodes(),
+
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
tblCfg.value().replicas(),
partitionNumber,
replicaGrpId,
@@ -2002,7 +2124,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
public void onError(Throwable e) {
LOG.warn("Unable to process switch reduce event", e);
}
- });
+ };
}
private PartitionMover createPartitionMover(InternalTable internalTable,
int partId) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 0b0b5f36c4..a3dea5ffa2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operations;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
/**
@@ -82,14 +81,14 @@ public class RebalanceUtil {
*
* @param tableName Table name.
* @param partId Unique identifier of a partition.
- * @param baselineNodes Nodes in baseline.
+ * @param dataNodes Data nodes.
* @param replicas Number of replicas for a table.
* @param revision Revision of Meta Storage that is specific for the
assignment update.
* @param metaStorageMgr Meta Storage manager.
* @return Future representing result of updating keys in {@code
metaStorageMgr}
*/
public static @NotNull CompletableFuture<Void>
updatePendingAssignmentsKeys(
- String tableName, TablePartitionId partId, Collection<ClusterNode>
baselineNodes,
+ String tableName, TablePartitionId partId, Collection<String>
dataNodes,
int replicas, long revision, MetaStorageManager metaStorageMgr,
int partNum) {
ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
@@ -99,7 +98,7 @@ public class RebalanceUtil {
ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
- Set<Assignment> partAssignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+ Set<Assignment> partAssignments =
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
@@ -341,14 +340,14 @@ public class RebalanceUtil {
* If there is rebalancing in progress, then new assignments will be
applied when rebalance finishes.
*
* @param metaStorageMgr MetaStorage manager.
- * @param baselineNodes Baseline nodes.
+ * @param dataNodes Data nodes.
* @param replicas Replicas count.
* @param partNum Number of the partition.
* @param partId Partition's raft group id.
* @param event Assignments switch reduce change event.
* @return Completable future that signifies the completion of this
operation.
*/
- public static CompletableFuture<Void>
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode>
baselineNodes,
+ public static CompletableFuture<Void>
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<String>
dataNodes,
int replicas, int partNum, TablePartitionId partId, WatchEvent
event) {
Entry entry = event.entryEvent().newEntry();
byte[] eventData = entry.value();
@@ -359,7 +358,7 @@ public class RebalanceUtil {
return CompletableFuture.completedFuture(null);
}
- Set<Assignment> assignments =
AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+ Set<Assignment> assignments =
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
ByteArray pendingKey = pendingPartAssignmentsKey(partId);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
new file mode 100644
index 0000000000..1645d99ebe
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.emptySet;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.SchemaManager;
+import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.utils.RebalanceUtil;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests the distribution zone watch listener in {@link TableManager}.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class TableManagerDistributionZonesTest extends IgniteAbstractTest {
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ @Mock()
+ private ClusterService clusterService;
+
+ private TablesConfiguration tablesConfiguration;
+
+ private WatchListener watchListener;
+
+ private TableManager tableManager;
+
+ @BeforeEach
+ public void setUp() {
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Set.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
+
+ doAnswer(invocation -> {
+ WatchListener listener = invocation.getArgument(1);
+
+ if (watchListener == null) {
+ watchListener = listener;
+ }
+
+ return null;
+ }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+ tablesConfiguration = mock(TablesConfiguration.class);
+
+ clusterCfgMgr.start();
+
+ AtomicLong raftIndex = new AtomicLong();
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage);
+
+ RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+ // Delegate directly to listener.
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WriteCommand command() {
+ return (WriteCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onWrite(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any());
+
+ MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+
+ lenient().doAnswer(invocationClose -> {
+ Iif iif = invocationClose.getArgument(0);
+
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(iif).build();
+
+ return metaStorageService.run(multiInvokeCommand);
+ }).when(metaStorageManager).invoke(any());
+
+ when(clusterService.messagingService()).thenAnswer(invocation -> {
+ MessagingService ret = mock(MessagingService.class);
+
+ return ret;
+ });
+
+ tableManager = new TableManager(
+ "node1",
+ (x) -> {},
+ tablesConfiguration,
+ clusterService,
+ null,
+ null,
+ null,
+ null,
+ null,
+ mock(TopologyService.class),
+ null,
+ null,
+ null,
+ metaStorageManager,
+ mock(SchemaManager.class),
+ null,
+ null,
+ mock(OutgoingSnapshotsManager.class)
+ );
+ }
+
+ @Test
+ void dataNodesTriggersAssignmentsChanging() {
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 =
mockTable(0, 1, 0);
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table1 =
mockTable(1, 2, 0);
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table2 =
mockTable(2, 1, 1);
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table3 =
mockTable(3, 2, 1);
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table4 =
mockTable(4, 1, 1);
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table5 =
mockTable(5, 2, 1);
+
+ List<IgniteBiTuple<TableView, ExtendedTableConfiguration>>
mockedTables =
+ List.of(table0, table1, table2, table3, table4, table5);
+
+ mockTables(mockedTables);
+
+ tableManager.start();
+
+ Set<String> nodes = Set.of("node0", "node1", "node2");
+
+ watchListenerOnUpdate(1, nodes, 1);
+
+ Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::pendingPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(6)).invoke(any());
+ }
+
+ @Test
+ void sequentialAssignmentsChanging() {
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table =
mockTable(0, 1, 1);
+
+ List<IgniteBiTuple<TableView, ExtendedTableConfiguration>>
mockedTables = List.of(table);
+
+ mockTables(mockedTables);
+
+ tableManager.start();
+
+ Set<String> nodes = Set.of("node0", "node1", "node2");
+
+ watchListenerOnUpdate(1, nodes, 1);
+
+ Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::pendingPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ nodes = Set.of("node3", "node4", "node5");
+
+ watchListenerOnUpdate(1, nodes, 2);
+
+ zoneNodes.clear();
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::plannedPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+ }
+
+ @Test
+ void sequentialEmptyAssignmentsChanging() {
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table =
mockTable(0, 1, 1);
+
+ List<IgniteBiTuple<TableView, ExtendedTableConfiguration>>
mockedTables = List.of(table);
+
+ mockTables(mockedTables);
+
+ tableManager.start();
+
+ watchListenerOnUpdate(1, null, 1);
+
+ Set<String> nodes = Set.of("node0", "node1", "node2");
+
+ watchListenerOnUpdate(1, nodes, 2);
+
+ Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::pendingPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ nodes = emptySet();
+
+ watchListenerOnUpdate(1, nodes, 3);
+
+ zoneNodes.clear();
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::plannedPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+ }
+
+ @Test
+ void staleDataNodesEvent() {
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> table =
mockTable(0, 1, 1);
+
+ List<IgniteBiTuple<TableView, ExtendedTableConfiguration>>
mockedTables = List.of(table);
+
+ mockTables(mockedTables);
+
+ tableManager.start();
+
+ Set<String> nodes = Set.of("node0", "node1", "node2");
+
+ watchListenerOnUpdate(1, nodes, 1);
+
+ Map<Integer, Set<String>> zoneNodes = new HashMap<>();
+
+ zoneNodes.put(1, nodes);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::pendingPartAssignmentsKey);
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ Set<String> nodes2 = Set.of("node3", "node4", "node5");
+
+ watchListenerOnUpdate(1, nodes2, 1);
+
+ checkAssignments(mockedTables, zoneNodes,
RebalanceUtil::pendingPartAssignmentsKey);
+
+ TablePartitionId partId = new TablePartitionId(new UUID(0, 0), 0);
+
+
assertNull(keyValueStorage.get(RebalanceUtil.plannedPartAssignmentsKey(partId).bytes()).value());
+
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+ }
+
+ private void checkAssignments(
+ List<IgniteBiTuple<TableView, ExtendedTableConfiguration>>
mockedTables,
+ Map<Integer, Set<String>> zoneNodes,
+ Function<TablePartitionId, ByteArray> assignmentFunction
+ ) {
+ for (int i = 0; i < mockedTables.size(); i++) {
+ TableView tableView = mockedTables.get(i).get1();
+
+ for (int j = 0; j < tableView.partitions(); j++) {
+ TablePartitionId partId = new TablePartitionId(new UUID(0, i),
j);
+
+ byte[] actualAssignmentsBytes =
keyValueStorage.get(assignmentFunction.apply(partId).bytes()).value();
+
+ Set<String> expectedNodes = zoneNodes.get(tableView.zoneId());
+
+ if (expectedNodes != null) {
+ Set<String> expectedAssignments =
+ calculateAssignmentForPartition(expectedNodes, j,
tableView.replicas())
+ .stream().map(assignment ->
assignment.consistentId()).collect(Collectors.toSet());
+
+ assertNotNull(actualAssignmentsBytes);
+
+ Set<String> actualAssignments = ((Set<Assignment>)
fromBytes(actualAssignmentsBytes))
+ .stream().map(assignment ->
assignment.consistentId()).collect(Collectors.toSet());
+
+
assertTrue(expectedAssignments.containsAll(actualAssignments));
+
+ assertEquals(expectedAssignments.size(),
actualAssignments.size());
+ } else {
+ assertNull(actualAssignmentsBytes);
+ }
+ }
+ }
+ }
+
+ private void watchListenerOnUpdate(int zoneId, Set<String> nodes, long
rev) {
+ byte[] newLogicalTopology;
+
+ if (nodes != null) {
+ newLogicalTopology = toBytes(nodes);
+ } else {
+ newLogicalTopology = null;
+ }
+
+ Entry newEntry = new EntryImpl(zoneDataNodesKey(zoneId).bytes(),
newLogicalTopology, rev, 1);
+
+ EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+ WatchEvent evt = new WatchEvent(entryEvent);
+
+ watchListener.onUpdate(evt);
+ }
+
+ private IgniteBiTuple<TableView, ExtendedTableConfiguration> mockTable(int
tableNum, int partNum, int zoneId) {
+ TableView tableView = mock(TableView.class);
+ when(tableView.zoneId()).thenReturn(zoneId);
+ when(tableView.name()).thenReturn("table" + tableNum);
+ when(tableView.replicas()).thenReturn(1);
+ when(tableView.partitions()).thenReturn(partNum);
+
+ ExtendedTableConfiguration tableCfg =
mock(ExtendedTableConfiguration.class);
+ ConfigurationValue valueId = mock(ConfigurationValue.class);
+ when(valueId.value()).thenReturn(new UUID(0, tableNum));
+ when(tableCfg.id()).thenReturn(valueId);
+
+ return new IgniteBiTuple<>(tableView, tableCfg);
+
+ }
+
+ private void mockTables(List<IgniteBiTuple<TableView,
ExtendedTableConfiguration>> mockedTables) {
+ NamedListView<TableView> valueList = mock(NamedListView.class);
+
+ when(valueList.namedListKeys()).thenReturn(new ArrayList<>());
+
+ NamedConfigurationTree<TableConfiguration, TableView, TableChange>
tables = mock(NamedConfigurationTree.class);
+
+ when(valueList.size()).thenReturn(mockedTables.size());
+ when(tables.value()).thenReturn(valueList);
+
+ for (int i = 0; i < mockedTables.size(); i++) {
+ IgniteBiTuple<TableView, ExtendedTableConfiguration> mockedTable =
mockedTables.get(i);
+
+ TableView tableView = mockedTable.get1();
+
+ when(valueList.get(i)).thenReturn(tableView);
+
+ when(tables.get(tableView.name())).thenReturn(mockedTable.get2());
+ }
+
+ ExtendedTableConfiguration tableCfg =
mock(ExtendedTableConfiguration.class);
+ when(tables.any()).thenReturn(tableCfg);
+ when(tableCfg.replicas()).thenReturn(mock(ConfigurationValue.class));
+
when(tableCfg.assignments()).thenReturn(mock(ConfigurationValue.class));
+
+ when(tablesConfiguration.tables()).thenReturn(tables);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a4520d5f1c..64e5fa0708 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -255,6 +255,8 @@ public class TableManagerTest extends IgniteAbstractTest {
public void testPreconfiguredTable() throws Exception {
when(rm.startRaftGroupService(any(), any())).thenAnswer(mock ->
completedFuture(mock(RaftGroupService.class)));
+ mockMetastore();
+
TableManager tableManager = createTableManager(tblManagerFut, false);
tblManagerFut.complete(tableManager);
@@ -264,8 +266,6 @@ public class TableManagerTest extends IgniteAbstractTest {
SchemaBuilders.column("val",
ColumnType.INT64).asNullable(true).build()
).withPrimaryKey("key").build();
- mockMetastore();
-
tblsCfg.tables().change(tablesChange -> {
tablesChange.create(scmTbl.name(), tableChange -> {
(SchemaConfigurationConverter.convert(scmTbl, tableChange))
@@ -650,6 +650,8 @@ public class TableManagerTest extends IgniteAbstractTest {
.thenReturn(assignment);
}
+ mockMetastore();
+
TableManager tableManager = createTableManager(tblManagerFut, true);
final int tablesBeforeCreation = tableManager.tables().size();
@@ -687,8 +689,6 @@ public class TableManagerTest extends IgniteAbstractTest {
.changePartitions(PARTITIONS)
);
- mockMetastore();
-
assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
TableImpl tbl2 = (TableImpl) tbl2Fut.get();