This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 794e272970 IGNITE-19986 Increase stability of
inMemoryNodeRestartNotLeader in ItIgniteInMemoryNodeRestartTest (#2323)
794e272970 is described below
commit 794e272970f583ef591027ffbc01b754cc8aa488
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Jul 17 13:11:10 2023 +0400
IGNITE-19986 Increase stability of inMemoryNodeRestartNotLeader in
ItIgniteInMemoryNodeRestartTest (#2323)
---
.../app/ItIgniteInMemoryNodeRestartTest.java | 35 ++++++++++++++++++----
1 file changed, 30 insertions(+), 5 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 192592d2ce..a18cae7913 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.runner.app;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -42,11 +43,12 @@ import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -192,7 +194,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
InternalTableImpl internalTable = (InternalTableImpl)
restartingTable.internalTable();
// Check that it restarts.
- assertTrue(IgniteTestUtils.waitForCondition(
+ assertTrue(waitForCondition(
() -> {
boolean raftNodeStarted =
loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
@@ -244,7 +246,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
Loza loza = restartingNode.raftManager();
// Check that it restarts.
- assertTrue(IgniteTestUtils.waitForCondition(
+ assertTrue(waitForCondition(
() -> loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
return ((TablePartitionId) nodeId.groupId()).tableId()
== table.tableId();
@@ -286,7 +288,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
for (int i = 0; i < 3; i++) {
Loza loza = ignite(i).raftManager();
- assertTrue(IgniteTestUtils.waitForCondition(
+ assertTrue(waitForCondition(
() -> loza.localNodes().stream().anyMatch(nodeId -> {
if (nodeId.groupId() instanceof TablePartitionId) {
return ((TablePartitionId)
nodeId.groupId()).tableId() == table.tableId();
@@ -325,7 +327,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
* @param replicas Replica factor.
* @param partitions Partitions count.
*/
- private static void createTableWithData(Ignite ignite, String name, int
replicas, int partitions) {
+ private static void createTableWithData(Ignite ignite, String name, int
replicas, int partitions) throws InterruptedException {
try (Session session = ignite.sql().createSession()) {
session.execute(null, String.format("CREATE ZONE IF NOT EXISTS
ZONE_%s ENGINE aimem WITH REPLICAS=%d, PARTITIONS=%d",
name, replicas, partitions));
@@ -342,6 +344,29 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
var table = (TableImpl) ignite.tables().table(name);
assertThat(table.internalTable().storage().isVolatile(), is(true));
+
+ waitTillTableDataPropagatesToAllNodes(name, partitions);
+ }
+
+ private static void waitTillTableDataPropagatesToAllNodes(String name, int
partitions) throws InterruptedException {
+ assertTrue(
+ waitForCondition(() -> tableHasDataOnAllIgnites(name,
partitions), TimeUnit.SECONDS.toMillis(10)),
+ "Did not see tuples propagate to all Ignites in time"
+ );
+ }
+
+ private static boolean tableHasDataOnAllIgnites(String name, int
partitions) {
+ return CLUSTER_NODES.stream()
+ .allMatch(igniteNode -> tableHasAnyData((TableImpl)
igniteNode.tables().table(name), partitions));
+ }
+
+ private static boolean tableHasAnyData(TableImpl nodeTable, int
partitions) {
+ return IntStream.range(0, partitions)
+ .mapToObj(partition -> new IgniteBiTuple<>(
+ partition,
nodeTable.internalTable().storage().getMvPartition(partition)
+ ))
+ .filter(pair -> pair.get2() != null)
+ .anyMatch(pair ->
pair.get2().closestRowId(RowId.lowestRowId(pair.get1())) != null);
}
private static IgniteImpl ignite(int idx) {