This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new af65a70c4c IGNITE-20053 ItRebalanceDistributedTest fixed (#2433)
af65a70c4c is described below
commit af65a70c4c939ba129adea58740134f865f61255
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Aug 11 19:02:58 2023 +0300
IGNITE-20053 ItRebalanceDistributedTest fixed (#2433)
---
.../storage/ItRebalanceDistributedTest.java | 42 ++++++++++++++--------
.../distributed/storage/InternalTableImpl.java | 2 +-
2 files changed, 29 insertions(+), 15 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 21884dd867..b9357a3f49 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -183,7 +183,6 @@ import org.mockito.Mockito;
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-20053")
public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -192,9 +191,11 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private static final String ZONE_1_NAME = "zone1";
- public static final int BASE_PORT = 20_000;
+ private static final int BASE_PORT = 20_000;
- public static final String HOST = "localhost";
+ private static final String HOST = "localhost";
+
+ private static final int ASSIGNMENTS_AWAIT_TIMEOUT_MILLIS = 10_000;
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
@@ -247,13 +248,13 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
nodes.get(0).cmgManager.initCluster(List.of(nodes.get(2).name),
List.of(nodes.get(2).name), "cluster");
+ nodes.stream().forEach(Node::waitWatches);
+
assertThat(
allOf(nodes.get(0).cmgManager.onJoinReady(),
nodes.get(1).cmgManager.onJoinReady(), nodes.get(2).cmgManager.onJoinReady()),
willCompleteSuccessfully()
);
- nodes.stream().forEach(Node::waitWatches);
-
assertTrue(waitForCondition(() ->
nodes.get(0).cmgManager.logicalTopology().join().nodes().size() == 3, 10_000));
}
@@ -265,7 +266,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
@Test
- void testOneRebalance() {
+ void testOneRebalance() throws InterruptedException {
createZone(nodes.get(0).distributionZoneManager, ZONE_1_NAME, 1, 1);
TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC",
"tbl1").columns(
@@ -279,7 +280,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
tblChanger -> SchemaConfigurationConverter.convert(schTbl1,
tblChanger)
));
- assertEquals(1, getPartitionClusterNodes(0, 0).size());
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(0,
0).size() == 1, ASSIGNMENTS_AWAIT_TIMEOUT_MILLIS));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 2));
@@ -291,7 +292,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
@Test
- void testTwoQueuedRebalances() {
+ void testTwoQueuedRebalances() throws InterruptedException {
createZone(nodes.get(0).distributionZoneManager, ZONE_1_NAME, 1, 1);
TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC",
"tbl1").columns(
@@ -305,7 +306,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
tblChanger -> SchemaConfigurationConverter.convert(schTbl1,
tblChanger)
));
- assertEquals(1, getPartitionClusterNodes(0, 0).size());
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(0,
0).size() == 1, ASSIGNMENTS_AWAIT_TIMEOUT_MILLIS));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 2));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 3));
@@ -331,7 +332,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
ZONE_1_NAME,
tblChanger -> SchemaConfigurationConverter.convert(schTbl1,
tblChanger)));
- assertTrue(waitForCondition(() -> getPartitionClusterNodes(0,
0).size() == 1, 10_000));
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(0,
0).size() == 1, ASSIGNMENTS_AWAIT_TIMEOUT_MILLIS));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 2));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 3));
@@ -361,6 +362,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
zoneName,
tblChanger -> SchemaConfigurationConverter.convert(schTbl1,
tblChanger)));
+ waitPartitionAssignmentsSyncedToExpected(0, 2);
+
Set<String> partitionNodesConsistentIds = getPartitionClusterNodes(0,
0).stream()
.map(Assignment::consistentId)
.collect(Collectors.toSet());
@@ -411,7 +414,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
}
@Test
- void testRebalanceRetryWhenCatchupFailed() {
+ void testRebalanceRetryWhenCatchupFailed() throws InterruptedException {
createZone(nodes.get(0).distributionZoneManager, ZONE_1_NAME, 1, 1);
TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC",
"tbl1").columns(
@@ -424,7 +427,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
ZONE_1_NAME,
tblChanger -> SchemaConfigurationConverter.convert(schTbl1,
tblChanger)));
- assertEquals(1, getPartitionClusterNodes(0, 0).size());
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(0,
0).size() == 1, ASSIGNMENTS_AWAIT_TIMEOUT_MILLIS));
await(alterZoneReplicas(nodes.get(0).distributionZoneManager,
ZONE_1_NAME, 1));
@@ -466,12 +469,16 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
void testDestroyPartitionStoragesOnEvictNode() {
createTableWithOnePartition(TABLE_1_NAME, ZONE_1_NAME, 3, true);
+ waitPartitionAssignmentsSyncedToExpected(0, 3);
+
Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(0, 0);
nodes.forEach(node ->
prepareFinishHandleChangeStableAssignmentEventFuture(node, TABLE_1_NAME, 0));
changeTableReplicasForSinglePartition(ZONE_1_NAME, 2);
+ waitPartitionAssignmentsSyncedToExpected(0, 2);
+
Set<Assignment> assignmentsAfterChangeReplicas =
getPartitionClusterNodes(0, 0);
Set<Assignment> evictedAssignments =
getEvictedAssignments(assignmentsBeforeChangeReplicas,
assignmentsAfterChangeReplicas);
@@ -494,9 +501,12 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
@Test
@UseTestTxStateStorage
@UseRocksMetaStorage
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20187")
void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo)
throws Exception {
createTableWithOnePartition(TABLE_1_NAME, ZONE_1_NAME, 3, true);
+ waitPartitionAssignmentsSyncedToExpected(0, 3);
+
Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(0, 0);
nodes.forEach(node -> {
@@ -507,6 +517,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
changeTableReplicasForSinglePartition(ZONE_1_NAME, 2);
+ waitPartitionAssignmentsSyncedToExpected(0, 2);
+
Assignment evictedAssignment =
first(getEvictedAssignments(assignmentsBeforeChangeReplicas,
getPartitionClusterNodes(0, 0)));
Node evictedNode =
findNodeByConsistentId(evictedAssignment.consistentId());
@@ -879,7 +891,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
vaultManager,
nodeCfgMgr,
clusterService,
- raftManager
+ raftManager,
+ cmgManager
);
firstComponents.forEach(IgniteComponent::start);
@@ -888,7 +901,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
deployWatchesFut = CompletableFuture.supplyAsync(() -> {
List<IgniteComponent> secondComponents = List.of(
- cmgManager,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
@@ -1022,6 +1034,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
willCompleteSuccessfully()
);
+ waitPartitionAssignmentsSyncedToExpected(0, replicas);
+
assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 792a5202a8..dbd6cc35ca 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -501,7 +501,7 @@ public class InternalTableImpl implements InternalTable {
return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r,
e) -> {
if (full) { // Full txn is already finished remotely. Just update
local state.
- // TODO: IGNITE-17638 TestOnly code, let's consider using Txn
state map instead of states.
+ // TODO: IGNITE-20033 TestOnly code, let's consider using Txn
state map instead of states.
txManager.changeState(tx0.id(), PENDING, e == null ? COMMITED
: ABORTED);
return e != null ? failedFuture(wrapReplicationException(e)) :
completedFuture(r);
}