This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bcb20bd11e IGNITE-18737 Fixed updating of partition assignments when
new assignments equal to table configuration assignments and stable assignments
are empty. Fixes #1655
bcb20bd11e is described below
commit bcb20bd11ec9bf112b502f3a0fb18a5e696e864a
Author: Sergey Uttsel <[email protected]>
AuthorDate: Wed Feb 22 09:31:09 2023 +0200
IGNITE-18737 Fixed updating of partition assignments when new assignments
equal to table configuration assignments and stable assignments are empty.
Fixes #1655
Signed-off-by: Slava Koptilin <[email protected]>
---
.../runner/app/ItIgniteNodeRestartTest.java | 47 +-
.../internal/table/distributed/TableManager.java | 23 +-
.../ignite/internal/utils/RebalanceUtil.java | 74 ++-
.../TableManagerDistributionZonesTest.java | 22 +
.../utils/RebalanceUtilUpdateAssignmentsTest.java | 541 +++++++++++++++++++++
5 files changed, 652 insertions(+), 55 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 71e53c799e..340052a38a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -128,7 +127,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
@ExtendWith(ConfigurationExtension.class)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18737")
public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
/** Default node port. */
private static final int DEFAULT_NODE_PORT = 3344;
@@ -668,46 +666,11 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
assertEquals(newPort, nodePort);
}
- /**
- * Checks that the only one non-default property overwrites after another
configuration is passed on the node restart.
- */
- @Test
- public void twoCustomPropertiesTest() {
- String startCfg = "network: {\n"
- + " port:3344,\n"
- + " nodeFinder: {netClusterNodes:[ \"localhost:3344\" ]}\n"
- + "}";
-
- IgniteImpl ignite = startNode(0, startCfg);
-
- assertEquals(
- 3344,
-
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value()
- );
-
- assertArrayEquals(
- new String[]{"localhost:3344"},
-
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value()
- );
-
- stopNode(0);
-
- ignite = startNode(0, "network.nodeFinder.netClusterNodes=[
\"localhost:3344\", \"localhost:3343\" ]");
-
- assertEquals(
- 3344,
-
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value()
- );
-
- assertArrayEquals(
- new String[]{"localhost:3344", "localhost:3343"},
-
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value()
- );
- }
-
/**
* Restarts the node which stores some data.
*/
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
+ + "but the table have not started yet.")
@Test
public void nodeWithDataTest() throws InterruptedException {
IgniteImpl ignite = startNode(0);
@@ -724,6 +687,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in the same order when they started at first.
*/
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
+ + "but the table have not started yet.")
@Test
public void testTwoNodesRestartDirect() throws InterruptedException {
twoNodesRestart(true);
@@ -732,6 +697,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in reverse order when they started at first.
*/
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
+ + "but the table have not started yet.")
@Test
public void testTwoNodesRestartReverse() throws InterruptedException {
twoNodesRestart(false);
@@ -870,6 +837,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Checks that a cluster is able to restart when some changes were made in
configuration.
*/
+ @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
+ + "but the table have not started yet.")
@Test
public void testRestartDiffConfig() throws InterruptedException {
List<IgniteImpl> ignites = startNodes(2);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3a176604b9..22ba3f815e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -632,12 +632,16 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<?>[] futures = new
CompletableFuture<?>[partCnt];
+ byte[] assignmentsBytes = ((ExtendedTableConfiguration)
tblCfg).assignments().value();
+
+ List<Set<Assignment>> tableAssignments =
ByteUtils.fromBytes(assignmentsBytes);
+
for (int i = 0; i < partCnt; i++) {
TablePartitionId replicaGrpId = new
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
futures[i] =
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId,
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
newReplicas,
- replicasCtx.storageRevision(), metaStorageMgr, i);
+ replicasCtx.storageRevision(), metaStorageMgr, i,
tableAssignments.get(i));
}
return allOf(futures);
@@ -983,14 +987,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return;
}
+ busyLock.block();
+
metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener);
metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
- busyLock.block();
-
Map<UUID, TableImpl> tables = tablesByIdVv.latest();
cleanUpTablesResources(tables);
@@ -1863,19 +1867,26 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
if (zoneId == tableZoneId) {
TableConfiguration tableCfg =
tables.get(tableView.name());
+ byte[] assignmentsBytes =
((ExtendedTableConfiguration) tableCfg).assignments().value();
+
+ List<Set<Assignment>> tableAssignments =
ByteUtils.fromBytes(assignmentsBytes);
+
for (int part = 0; part < tableView.partitions();
part++) {
UUID tableId = ((ExtendedTableConfiguration)
tableCfg).id().value();
TablePartitionId replicaGrpId = new
TablePartitionId(tableId, part);
+ int replicas = tableView.replicas();
+
int partId = part;
updatePendingAssignmentsKeys(
- tableView.name(), replicaGrpId,
dataNodes, tableView.replicas(),
-
evt.entryEvent().newEntry().revision(), metaStorageMgr, part
+ tableView.name(), replicaGrpId,
dataNodes, replicas,
+
evt.entryEvent().newEntry().revision(), metaStorageMgr, part,
tableAssignments.get(part)
).exceptionally(e -> {
LOG.error(
- "Exception on updating assignments
for [table={}, partition={}]", e, tableView.name(), partId
+ "Exception on updating assignments
for [table={}, partition={}]", e, tableView.name(),
+ partId
);
return null;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index a3dea5ffa2..8d9d3953dc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.utils;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -69,12 +71,22 @@ public class RebalanceUtil {
/** Return code of metastore multi-invoke which identifies,
* that planned key was removed, because current rebalance is already have
the same target.
*/
- private static final int PLANNED_KEY_REMOVED = 2;
+ private static final int PLANNED_KEY_REMOVED_EQUALS_PENDING = 2;
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that planned key was removed, because current assignment is empty.
+ */
+ private static final int PLANNED_KEY_REMOVED_EMPTY_PENDING = 3;
+
+ /** Return code of metastore multi-invoke which identifies,
+ * that assignments do not need to be updated.
+ */
+ private static final int ASSIGNMENT_NOT_UPDATED = 4;
/** Return code of metastore multi-invoke which identifies,
* that this trigger event was already processed by another node and must
be skipped.
*/
- private static final int OUTDATED_UPDATE_RECEIVED = 3;
+ private static final int OUTDATED_UPDATE_RECEIVED = 5;
/**
* Update keys that related to rebalance algorithm in Meta Storage. Keys
are specific for partition.
@@ -85,11 +97,13 @@ public class RebalanceUtil {
* @param replicas Number of replicas for a table.
* @param revision Revision of Meta Storage that is specific for the
assignment update.
* @param metaStorageMgr Meta Storage manager.
+ * @param partNum Partition id.
+ * @param tableCfgPartAssignments Table configuration assignments.
* @return Future representing result of updating keys in {@code
metaStorageMgr}
*/
public static @NotNull CompletableFuture<Void>
updatePendingAssignmentsKeys(
String tableName, TablePartitionId partId, Collection<String>
dataNodes,
- int replicas, long revision, MetaStorageManager metaStorageMgr,
int partNum) {
+ int replicas, long revision, MetaStorageManager metaStorageMgr,
int partNum, Set<Assignment> tableCfgPartAssignments) {
ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsKey(partId);
@@ -100,32 +114,59 @@ public class RebalanceUtil {
Set<Assignment> partAssignments =
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
+ boolean isNewAssignments =
!tableCfgPartAssignments.equals(partAssignments);
+
byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
// if empty(partition.change.trigger.revision) ||
partition.change.trigger.revision < event.revision:
- // if empty(partition.assignments.pending) &&
partition.assignments.stable != calcPartAssighments():
+ // if empty(partition.assignments.pending)
+ // && ((isNewAssignments &&
empty(partition.assignments.stable))
+ // || (partition.assignments.stable !=
calcPartAssighments() && !empty(partition.assignments.stable))):
// partition.assignments.pending = calcPartAssignments()
// partition.change.trigger.revision = event.revision
// else:
- // if partition.assignments.pending != calcPartAssignments
+ // if partition.assignments.pending != calcPartAssignments
&& !empty(partition.assignments.pending)
// partition.assignments.planned = calcPartAssignments()
// partition.change.trigger.revision = event.revision
- // else
+ // else if partition.assignments.pending ==
calcPartAssignments
+ // remove(partition.assignments.planned)
+ // message after the metastorage invoke:
+ // "Remove planned key because current pending key has
the same value."
+ // else if empty(partition.assignments.pending)
// remove(partition.assignments.planned)
+ // message after the metastorage invoke:
+ // "Remove planned key because pending is empty and
calculated assignments are equal to current assignments."
// else:
// skip
+
+ Condition newAssignmentsCondition;
+
+ if (isNewAssignments) {
+ newAssignmentsCondition = or(
+ notExists(partAssignmentsStableKey),
+
and(value(partAssignmentsStableKey).ne(partAssignmentsBytes),
exists(partAssignmentsStableKey))
+ );
+ } else {
+ newAssignmentsCondition =
and(value(partAssignmentsStableKey).ne(partAssignmentsBytes),
exists(partAssignmentsStableKey));
+ }
+
var iif = iif(or(notExists(partChangeTriggerKey),
value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
- iif(and(notExists(partAssignmentsPendingKey),
value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
+ iif(and(notExists(partAssignmentsPendingKey),
newAssignmentsCondition),
ops(
put(partAssignmentsPendingKey,
partAssignmentsBytes),
put(partChangeTriggerKey,
ByteUtils.longToBytes(revision))
).yield(PENDING_KEY_UPDATED),
-
iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
+
iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
exists(partAssignmentsPendingKey)),
ops(
put(partAssignmentsPlannedKey,
partAssignmentsBytes),
put(partChangeTriggerKey,
ByteUtils.longToBytes(revision))
).yield(PLANNED_KEY_UPDATED),
-
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED))),
+
iif(value(partAssignmentsPendingKey).eq(partAssignmentsBytes),
+
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING),
+
iif(notExists(partAssignmentsPendingKey),
+
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING),
+
ops().yield(ASSIGNMENT_NOT_UPDATED))
+ ))),
ops().yield(OUTDATED_UPDATE_RECEIVED));
return metaStorageMgr.invoke(iif).thenAccept(sr -> {
@@ -143,11 +184,24 @@ public class RebalanceUtil {
partAssignmentsPlannedKey, partNum, tableName,
ByteUtils.fromBytes(partAssignmentsBytes));
break;
- case PLANNED_KEY_REMOVED:
+ case PLANNED_KEY_REMOVED_EQUALS_PENDING:
LOG.info(
"Remove planned key because current pending key
has the same value [key={}, partition={}, table={}, val={}]",
partAssignmentsPlannedKey.toString(), partNum,
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+ break;
+ case PLANNED_KEY_REMOVED_EMPTY_PENDING:
+ LOG.info(
+ "Remove planned key because pending is empty and
calculated assignments are equal to current assignments "
+ + "[key={}, partition={}, table={},
val={}]",
+ partAssignmentsPlannedKey.toString(), partNum,
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+ break;
+ case ASSIGNMENT_NOT_UPDATED:
+ LOG.debug(
+ "Assignments are not updated [key={},
partition={}, table={}, val={}]",
+ partAssignmentsPlannedKey.toString(), partNum,
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
break;
case OUTDATED_UPDATE_RECEIVED:
LOG.debug(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index fba4182dce..64e3fed14a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -88,6 +88,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -230,6 +231,15 @@ public class TableManagerDistributionZonesTest extends
IgniteAbstractTest {
);
}
+ @AfterEach
+ public void tearDown() throws Exception {
+ clusterCfgMgr.stop();
+
+ keyValueStorage.close();
+
+ tableManager.stop();
+ }
+
@Test
void dataNodesTriggersAssignmentsChanging() {
IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 =
mockTable(0, 1, 0);
@@ -429,6 +439,18 @@ public class TableManagerDistributionZonesTest extends
IgniteAbstractTest {
when(valueId.value()).thenReturn(new UUID(0, tableNum));
when(tableCfg.id()).thenReturn(valueId);
+ List<Set<Assignment>> tableAssignments = new ArrayList<>();
+
+ for (int i = 0; i < partNum; i++) {
+ tableAssignments.add(Set.of(Assignment.forPeer("fakeAssignment")));
+ }
+
+ ConfigurationValue assignmentValue = mock(ConfigurationValue.class);
+
+ when(assignmentValue.value()).thenReturn(toBytes(tableAssignments));
+
+ when(tableCfg.assignments()).thenReturn(assignmentValue);
+
return new IgniteBiTuple<>(tableView, tableCfg);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
new file mode 100644
index 0000000000..a9f62a449c
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.utils;
+
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static
org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
+import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests for updating assignment in the meta storage.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class);
+
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private ClusterService clusterService;
+
+ private MetaStorageManager metaStorageManager;
+
+ private static final int partNum = 2;
+ private static final int replicas = 2;
+
+ private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i ->
"nodes1_" + i).collect(toSet());
+ private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i ->
"nodes2_" + i).collect(toSet());
+ private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i ->
"nodes3_" + i).collect(toSet());
+ private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i ->
"nodes4_" + i).collect(toSet());
+
+ private static final Set<Assignment> assignments1 =
calculateAssignmentForPartition(nodes1, partNum, replicas);
+ private static final Set<Assignment> assignments2 =
calculateAssignmentForPartition(nodes2, partNum, replicas);
+ private static final Set<Assignment> assignments3 =
calculateAssignmentForPartition(nodes3, partNum, replicas);
+ private static final Set<Assignment> assignments4 =
calculateAssignmentForPartition(nodes4, partNum, replicas);
+
+ @BeforeEach
+ public void setUp() {
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Set.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ clusterService = mock(ClusterService.class);
+
+ metaStorageManager = mock(MetaStorageManager.class);
+
+ clusterCfgMgr.start();
+
+ AtomicLong raftIndex = new AtomicLong();
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage);
+
+ RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+ // Delegate directly to listener.
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WriteCommand command() {
+ return (WriteCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onWrite(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any());
+
+ MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+
+ lenient().doAnswer(invocationClose -> {
+ Iif iif = invocationClose.getArgument(0);
+
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(iif).build();
+
+ return metaStorageService.run(multiInvokeCommand);
+ }).when(metaStorageManager).invoke(any());
+
+ when(clusterService.messagingService()).thenAnswer(invocation -> {
+ MessagingService ret = mock(MessagingService.class);
+
+ return ret;
+ });
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ clusterCfgMgr.stop();
+
+ keyValueStorage.close();
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=null, pending=null,
planned=null.
+ * Expected assignments in the metastorage after updating: stable=null,
pending=assignments1, planned=null.
+ */
+ @Test
+ void test1() {
+ test(
+ nodes1, assignments2,
+ null, null, null,
+ null, assignments1, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=null, pending=null,
planned=null.
+ * Expected assignments in the metastorage after updating: stable=null,
pending=null, planned=null.
+ */
+ @Test
+ void test2() {
+ test(
+ nodes1, assignments1,
+ null, null, null,
+ null, null, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=null,
pending=assignments3, planned=null.
+ * Expected assignments in the metastorage after updating: stable=null,
pending=assignments3, planned=assignments1.
+ */
+ @Test
+ void test3() {
+ test(
+ nodes1, assignments2,
+ null, assignments3, null,
+ null, assignments3, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=null,
pending=assignments3, planned=null.
+ * Expected assignments in the metastorage after updating: stable=null,
pending=assignments3, planned=assignments1.
+ */
+ @Test
+ void test4() {
+ test(
+ nodes1, assignments1,
+ null, assignments3, null,
+ null, assignments3, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments3,
pending=null, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments3, pending=assignments1, planned=null.
+ */
+ @Test
+ void test5() {
+ test(
+ nodes1, assignments2,
+ assignments3, null, null,
+ assignments3, assignments1, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=assignments3,
pending=null, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments3, pending=assignments1, planned=null.
+ */
+ @Test
+ void test6() {
+ test(
+ nodes1, assignments1,
+ assignments3, null, null,
+ assignments3, assignments1, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments1,
pending=null, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=null, planned=null.
+ */
+ @Test
+ void test7() {
+ test(
+ nodes1, assignments2,
+ assignments1, null, null,
+ assignments1, null, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=assignments1,
pending=null, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=null, planned=null.
+ */
+ @Test
+ void test8() {
+ test(
+ nodes1, assignments1,
+ assignments1, null, null,
+ assignments1, null, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments2,
pending=null, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments2, pending=assignments1, planned=null.
+ */
+ @Test
+ void test9() {
+ test(
+ nodes1, assignments2,
+ assignments2, null, null,
+ assignments2, assignments1, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments4,
pending=assignments3, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments4, pending=assignments3, planned=assignments1.
+ */
+ @Test
+ void test10() {
+ test(
+ nodes1, assignments2,
+ assignments4, assignments3, null,
+ assignments4, assignments3, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=assignments3,
pending=assignments2, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments3, pending=assignments2, planned=assignments1.
+ */
+ @Test
+ void test11() {
+ test(
+ nodes1, assignments1,
+ assignments3, assignments2, null,
+ assignments3, assignments2, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments3, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments3, planned=assignments1.
+ */
+ @Test
+ void test12() {
+ test(
+ nodes1, assignments2,
+ assignments1, assignments3, null,
+ assignments1, assignments3, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments2,
pending=assignments3, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments2, pending=assignments3, planned=assignments1.
+ */
+ @Test
+ void test13() {
+ test(
+ nodes1, assignments2,
+ assignments2, assignments3, null,
+ assignments2, assignments3, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments2, planned=null.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments2, planned=assignments1.
+ */
+ @Test
+ void test14() {
+ test(
+ nodes1, assignments1,
+ assignments1, assignments2, null,
+ assignments1, assignments2, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments1.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments2, planned=assignments3.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments2, planned=assignments1.
+ */
+ @Test
+ void test15() {
+ test(
+ nodes1, assignments1,
+ assignments1, assignments2, assignments3,
+ assignments1, assignments2, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes1.
+ * The table configuration assignments: assignments4.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments2, planned=assignments1.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments2, planned=assignments1.
+ */
+ @Test
+ void test16() {
+ test(
+ nodes1, assignments4,
+ assignments1, assignments2, assignments1,
+ assignments1, assignments2, assignments1
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes2.
+ * The table configuration assignments: assignments2.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments2, planned=assignments1.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments2, planned=null.
+ */
+ @Test
+ void test17() {
+ test(
+ nodes2, assignments2,
+ assignments1, assignments2, assignments1,
+ assignments1, assignments2, null
+ );
+ }
+
+ /**
+ * Nodes for new assignments calculating: nodes2.
+ * The table configuration assignments: assignments4.
+ * Current assignments in the metastorage: stable=assignments1,
pending=assignments2, planned=assignments1.
+ * Expected assignments in the metastorage after updating:
stable=assignments1, pending=assignments2, planned=null.
+ */
+ @Test
+ void test18() {
+ test(
+ nodes2, assignments4,
+ assignments1, assignments2, assignments1,
+ assignments1, assignments2, null
+ );
+ }
+
+ private void test(
+ Collection<String> nodesForNewAssignments,
+ Set<Assignment> tableCfgAssignments,
+ Set<Assignment> currentStableAssignments,
+ Set<Assignment> currentPendingAssignments,
+ Set<Assignment> currentPlannedAssignments,
+ Set<Assignment> expectedStableAssignments,
+ Set<Assignment> expectedPendingAssignments,
+ Set<Assignment> expectedPlannedAssignments
+ ) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ if (currentStableAssignments != null) {
+
keyValueStorage.put(stablePartAssignmentsKey(tablePartitionId).bytes(),
toBytes(currentStableAssignments));
+ }
+
+ if (currentPendingAssignments != null) {
+
keyValueStorage.put(pendingPartAssignmentsKey(tablePartitionId).bytes(),
toBytes(currentPendingAssignments));
+ }
+
+ if (currentPlannedAssignments != null) {
+
keyValueStorage.put(plannedPartAssignmentsKey(tablePartitionId).bytes(),
toBytes(currentPlannedAssignments));
+ }
+
+ RebalanceUtil.updatePendingAssignmentsKeys(
+ "table1", tablePartitionId, nodesForNewAssignments,
+ replicas, 1, metaStorageManager, partNum, tableCfgAssignments
+ );
+
+ byte[] actualStableBytes =
keyValueStorage.get(stablePartAssignmentsKey(tablePartitionId).bytes()).value();
+ Set<Assignment> actualStableAssignments = null;
+
+ if (actualStableBytes != null) {
+ actualStableAssignments = ByteUtils.fromBytes(actualStableBytes);
+ }
+
+ byte[] actualPendingBytes =
keyValueStorage.get(pendingPartAssignmentsKey(tablePartitionId).bytes()).value();
+ Set<Assignment> actualPendingAssignments = null;
+
+ if (actualPendingBytes != null) {
+ actualPendingAssignments = ByteUtils.fromBytes(actualPendingBytes);
+ }
+
+ byte[] actualPlannedBytes =
keyValueStorage.get(plannedPartAssignmentsKey(tablePartitionId).bytes()).value();
+ Set<Assignment> actualPlannedAssignments = null;
+
+ if (actualPlannedBytes != null) {
+ actualPlannedAssignments = ByteUtils.fromBytes(actualPlannedBytes);
+ }
+
+ LOG.info("stableAssignments " + actualStableAssignments);
+ LOG.info("pendingAssignments " + actualPendingAssignments);
+ LOG.info("plannedAssignments " + actualPlannedAssignments);
+
+ if (expectedStableAssignments != null) {
+ assertNotNull(actualStableBytes);
+ assertEquals(actualStableAssignments, expectedStableAssignments);
+ } else {
+ assertNull(actualStableBytes);
+ }
+
+ if (expectedPendingAssignments != null) {
+ assertNotNull(actualPendingBytes);
+ assertEquals(actualPendingAssignments, expectedPendingAssignments);
+ } else {
+ assertNull(actualPendingBytes);
+ }
+
+ if (expectedPlannedAssignments != null) {
+ assertNotNull(actualPlannedBytes);
+ assertEquals(actualPlannedAssignments, expectedPlannedAssignments);
+ } else {
+ assertNull(actualPlannedBytes);
+ }
+ }
+}