This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-18171 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit e50ee81159804616f5c60069a6f3ad5e7ebe2152 Author: amashenkov <[email protected]> AuthorDate: Wed Nov 30 19:51:41 2022 +0300 wip. --- .../internal/cluster/ItNodeStartStopTest.java | 354 ++++++++------------- 1 file changed, 140 insertions(+), 214 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java index 425baa80ff..899c2da771 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/ItNodeStartStopTest.java @@ -23,16 +23,15 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import io.micronaut.configuration.picocli.MicronautFactory; import io.micronaut.context.ApplicationContext; import io.micronaut.context.env.Environment; -import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.invoke.MethodHandles; -import java.lang.reflect.Method; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -41,17 +40,13 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider; @@ -64,7 +59,6 @@ import org.apache.ignite.internal.sql.engine.QueryProperty; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -74,17 +68,12 @@ import org.apache.ignite.tx.Transaction; import org.apache.ignite.tx.TransactionException; import org.hamcrest.Matchers; import org.hamcrest.text.IsEmptyString; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DynamicContainer; -import org.junit.jupiter.api.DynamicNode; -import org.junit.jupiter.api.DynamicTest; -import org.junit.jupiter.api.TestFactory; -import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; import picocli.CommandLine; @@ -97,9 +86,7 @@ import picocli.CommandLine; @ExtendWith(WorkDirectoryExtension.class) @WithSystemProperty(key = "org.jline.terminal.dumb", value = "true") public class ItNodeStartStopTest extends BaseIgniteAbstractTest { - public static final int NODE_JOIN_WAIT_TIMEOUT = 500; - public static final Runnable NOOP = () -> { - }; + public static final int NODE_JOIN_WAIT_TIMEOUT = 5_000; /** Work directory. */ @WorkDirectory @@ -154,28 +141,24 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + " \"netClusterNodes\": [ " + connectionAddr + " ]\n" + " }\n" + " }\n" - + "}" - ); - - private static final String NEW_NODE_CONFIG = "{\n" - + " \"network\": {\n" - + " \"port\":3348,\n" - + " \"nodeFinder\":{\n" - + " \"netClusterNodes\": [ " + connectionAddr + " ]\n" - + " }\n" - + " }\n" - + "}"; - - // TODO: Drop labels and make node names meaningful, when distribution zones will be implemented. - // Labels are used for better understanding node role. - // Node names uses for partition affinity calculation, we can't guarantee a node with name "DATA_NODE" will own a partition. - // So, static meaningless node names are used, which are mapped to labels. - private static final Map<String, String> nodeLabels = Map.of( - CMG_NODE, "C", - METASTORAGE_NODE, "M", - DATA_NODE, "D", - DATA_NODE_2, "D2", - NEW_NODE, "N" + + "}", + "newNode", "{\n" + + " \"network\": {\n" + + " \"port\":3348,\n" + + " \"nodeFinder\":{\n" + + " \"netClusterNodes\": [ " + connectionAddr + " ]\n" + + " }\n" + + " }\n" + + "}"); + + // TODO: Change Map -> Set. + // Map is used as node names uses for partition affinity calculation, + // but we can't guarantee a node with name "DATA_NODE" will own a partition. + private static final Map<String, String> gridNodes = Map.of( + "C", CMG_NODE, + "M", METASTORAGE_NODE, + "D", DATA_NODE, + "D2", DATA_NODE_2 ); /** Cluster nodes. */ @@ -184,8 +167,6 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { /** Runs after each test sequence. */ @BeforeEach public void before() throws Exception { - assert clusterNodes.isEmpty(); - for (String name : nodesCfg.keySet()) { IgniteUtils.deleteIfExists(WORK_DIR.resolve(name)); } @@ -193,7 +174,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { List<CompletableFuture<Ignite>> futures = new ArrayList<>(); // Start nodes. - nodesCfg.forEach((k, v) -> futures.add(IgnitionManager.start(k, v, WORK_DIR.resolve(k)))); + gridNodes.values().forEach(name -> futures.add(IgnitionManager.start(name, nodesCfg.get(name), WORK_DIR.resolve(name)))); // Init cluster. //TODO: Use dedicated node for metastorage. @@ -225,7 +206,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { /** Runs after each test sequence. */ @AfterEach - public void afterEach() throws IOException { + public void afterEach() { log.info("Stop all nodes."); clusterNodes.keySet().forEach(IgnitionManager::stop); @@ -238,8 +219,8 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { /** Filter out duplicates and invalid grids. */ private static BiPredicate<String, Set<String>> nodeFilter() { - return (nodeName, grid) -> (!grid.isEmpty() || CMG_NODE.equals(nodeName)) // CMG node always starts first. - && (!DATA_NODE_2.equals(nodeName) || grid.contains(DATA_NODE)); // Data nodes are interchangeable. + return (name, grid) -> (!grid.isEmpty() || "C".equals(name)) // CMG node always starts first. + && (!"D2".equals(name) || grid.contains("D")); // Data nodes are interchangeable. } /** @@ -247,101 +228,118 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { * * @return JUnit tests. */ - @TestFactory - public Stream<DynamicNode> gridStartupTestFactory() { + public static Object[] generateSequence() { return GridGenerator.generateStartupSequences( - nodesCfg.keySet(), - nodeFilter() - ).stream() - .map(nodes -> { - List<DynamicNode> scenarioSteps = new ArrayList<>(); - List<String> startedNodes = new ArrayList<>(); - - for (int i = 0; i < nodes.size(); i++) { - String nodeName = nodes.get(i); - startedNodes.add(nodeName); - - boolean first = (i == 0); - boolean last = (i >= (nodes.size() - 1)); - - Runnable setup = () -> { - assert !first || clusterNodes.isEmpty(); - - startNode(nodeName); - }; - - Runnable tearDown = last ? () -> stopCluster(nodes) : NOOP; - - scenarioSteps.add(DynamicContainer.dynamicContainer( - "Started " + clusterNodesToString(startedNodes), - List.of( - createDynamicTest("checkNewNodeJoin", setup, this::checkNewNodeJoin, NOOP), - createDynamicTest("checkDDL", NOOP, this::checkDDL, NOOP), - createDynamicTest("checkROTransaction", NOOP, this::checkROTransacton, NOOP), -// createTest("validateRWTransaction", NOOP, NOOP, this::validateRWTransaction), - createDynamicTest("checkImplicitRWTransaction", NOOP, this::checkImplicitRWTransaction, tearDown) - ) - )); - } - - return DynamicContainer.dynamicContainer("Start sequence " + clusterNodesToString(nodes), scenarioSteps); - }); + gridNodes.keySet(), + nodeFilter() + ).toArray(Object[]::new); + } + + @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER) + @MethodSource("generateSequence") + public void checkNodeJoin(List<String> nodeNames) { + for (String name : nodeNames) { + log.info("Adding node to grid: label=" + name + ", name=" + gridNodes.get(name)); + + startNode(gridNodes.get(name)); + + checkNewNodeJoin(); + } + } + + @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER) + @MethodSource("generateSequence") + public void checkDDL(List<String> nodeNames) { + for (String name : nodeNames) { + log.info("Adding node to grid: label=" + name + ", name=" + gridNodes.get(name)); + + startNode(gridNodes.get(name)); + + Ignite node = getInitializedNode(); + + if (node == null) { + continue; + } + + checkDDL(node); + } + } + + + @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER) + @MethodSource("generateSequence") + public void checkROTransaction(List<String> nodeNames) { + for (String name : nodeNames) { + log.info("Adding node to grid: label=" + name + ", name=" + gridNodes.get(name)); + + startNode(gridNodes.get(name)); + + Ignite node = getInitializedNode(); + + if (node == null) { + continue; + } + + checkROTransacton(node); + } + } + + @ParameterizedTest(name = "Node order=" + ParameterizedTest.ARGUMENTS_PLACEHOLDER) + @MethodSource("generateSequence") + public void checkRWTransaction(List<String> nodes) { + for (String name : nodes) { + log.info("Adding node to grid: label=" + name + ", name=" + gridNodes.get(name)); + + startNode(gridNodes.get(name)); + + Ignite node = getInitializedNode(); + + if (node == null) { + continue; + } + + checkRWTransaction(node); + } } private void checkNewNodeJoin() { try { - CompletableFuture<Ignite> fut = IgnitionManager.start(NEW_NODE, NEW_NODE_CONFIG, WORK_DIR.resolve(NEW_NODE)); + CompletableFuture<Ignite> fut = startNode(NEW_NODE); if (!clusterNodes.containsKey(CMG_NODE)) { assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class); - assertTrue(validateNodeEnterTopology("physical", NEW_NODE)); + assertTrue(topologyContainsNode("physical", NEW_NODE)); // CMG holds logical topology state. - assertThrowsWithCause(() -> validateNodeEnterTopology("logical", NEW_NODE), IgniteException.class); + assertThrowsWithCause(() -> topologyContainsNode("logical", NEW_NODE), IgniteException.class); return; } else if (!clusterNodes.containsKey(METASTORAGE_NODE)) { // Node future can't complete as some components requires Metastorage on start. assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class); - assertTrue(validateNodeEnterTopology("physical", NEW_NODE)); + assertTrue(topologyContainsNode("physical", NEW_NODE)); assertFalse( - validateNodeEnterTopology("logical", NEW_NODE)); //TODO: Is Metastore required to promote node to logical topology? + topologyContainsNode("logical", + NEW_NODE)); //TODO: Is Metastore required to promote node to logical topology? return; } assertThat(fut, willCompleteSuccessfully()); - assertTrue(validateNodeEnterTopology("physical", ((IgniteImpl) fut.join()).id())); - assertTrue(validateNodeEnterTopology("logical", ((IgniteImpl) fut.join()).id())); + assertTrue(topologyContainsNode("physical", ((IgniteImpl) fut.join()).id())); + assertTrue(topologyContainsNode("logical", ((IgniteImpl) fut.join()).id())); } finally { IgnitionManager.stop(NEW_NODE); } } - private static boolean validateNodeEnterTopology(String topologyType, String nodeId) { - StringWriter out = new StringWriter(); - StringWriter err = new StringWriter(); - - new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST))) - .setDefaultValueProvider(new EnvironmentDefaultValueProvider()) - .setOut(new PrintWriter(out, true)) - .setErr(new PrintWriter(err, true)) - .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL); - - assertThat(err.toString(), IsEmptyString.emptyString()); - - return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find(); - } - - public void checkDDL() { + public void checkDDL(Ignite node) { String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1"; String dropTableCommand = "DROP TABLE IF EXISTS tempTbl"; - Ignite node = getNode(); - try { sql(node, null, createTableCommand); } finally { @@ -349,8 +347,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } } - public void checkROTransacton() { - Ignite node = getNode(); + public void checkROTransacton(Ignite node) { Transaction roTx = node.transactions().readOnly().begin(); try { @@ -376,9 +373,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } } - public void checkImplicitRWTransaction() { - Ignite node = getNode(); - + public void checkImplicitRWTransaction(Ignite node) { // TODO: Bound table distribution zone to data nodes and uncomment. // if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) { if (clusterNodes.size() <= 2 || !clusterNodes.containsKey(DATA_NODE)) { @@ -404,11 +399,9 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } } - public void validateRWTransaction() { - Ignite node = getNode(); - - if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) { - Transaction tx = node.transactions().readOnly().begin(); + public void checkRWTransaction(Ignite node) { + if (!clusterNodes.containsKey(DATA_NODE)) { + Transaction tx = node.transactions().begin(); try { assertThrowsWithCause( () -> sql(node, tx, "INSERT INTO tbl1 VALUES (2, -2)"), @@ -422,7 +415,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { return; } - Transaction tx = node.transactions().readOnly().begin(); + Transaction tx = node.transactions().begin(); try { sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)"); @@ -435,87 +428,49 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } } - /** Get cluster node. */ - private @NotNull Ignite getNode() { - try { - CompletableFuture<Ignite> nodeFut; -// if (clusterNodes.containsKey(DATA_NODE)) { -// nodeFut = clusterNodes.get(DATA_NODE); -// } else if (clusterNodes.containsKey(DATA_NODE_2)) { -// nodeFut = clusterNodes.get(DATA_NODE_2); -// } else { -// } - nodeFut = clusterNodes.values().iterator().next(); - - if (clusterNodes.containsKey(METASTORAGE_NODE)) - return nodeFut.join(); - - return nodeFut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (TimeoutException ex) { - Assumptions.assumeTrue(clusterNodes.containsKey(METASTORAGE_NODE), - "No Ignite object instances available, because node initialization requires Metastorage."); - - IgniteTestUtils.sneakyThrow(ex); - } catch (Throwable th) { - IgniteTestUtils.sneakyThrow(th); - } - - return null; - } + private static boolean topologyContainsNode(String topologyType, String nodeId) { + StringWriter out = new StringWriter(); + StringWriter err = new StringWriter(); - /** - * Creates JUnit test node. - * - * @param testName Test name. - * @param setUpRunnable SetUp action. - * @param testRunnable Test action. - * @param tearDownRunnable TearDown action. - * @return JUnit test node. - */ - private DynamicTest createDynamicTest(String testName, Runnable setUpRunnable, Runnable testRunnable, Runnable tearDownRunnable) { - return DynamicTest.dynamicTest(testName, () -> { - TestInfoImpl info = new TestInfoImpl(testName); - try { - setupBase(info, WORK_DIR); - setUpRunnable.run(); + new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST))) + .setDefaultValueProvider(new EnvironmentDefaultValueProvider()) + .setOut(new PrintWriter(out, true)) + .setErr(new PrintWriter(err, true)) + .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL); - testRunnable.run(); - } finally { - tearDownRunnable.run(); - tearDownBase(info); - } - }); - } + assertThat(err.toString(), IsEmptyString.emptyString()); - private static String clusterNodesToString(Collection<String> nodes) { - return '[' + nodes.stream().map(nodeLabels::get).collect(Collectors.joining(", ")) + ']'; + return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find(); } - private void startNode(String nodeName) { - String nodeConfig = nodesCfg.get(nodeName); + /** Get cluster node or {@code null} if no node ready. */ + private @Nullable Ignite getInitializedNode() { + assert !clusterNodes.isEmpty(); - CompletableFuture<Ignite> node = IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName)); + CompletableFuture<Ignite> nodeFut = clusterNodes.values().iterator().next(); - clusterNodes.put(nodeName, node); - } + if (!clusterNodes.containsKey(METASTORAGE_NODE)) { + assertThrowsWithCause(() -> nodeFut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.MILLISECONDS), TimeoutException.class); - private void stopNode(String nodeName) { - Future<?> rmv = clusterNodes.remove(nodeName); + clusterNodes.forEach((k, v) -> assertNull(v.getNow(null), k)); - assert rmv != null; + return null; + } - IgnitionManager.stop(nodeName); + return nodeFut.join(); } - private void stopCluster(List<String> nodes) { - for (int i = nodes.size() - 1; i >= 0; i--) { - stopNode(nodes.get(i)); - } + private CompletableFuture<Ignite> startNode(String nodeName) { + String nodeConfig = nodesCfg.get(nodeName); - assert clusterNodes.isEmpty(); + CompletableFuture<Ignite> fut = IgnitionManager.start(nodeName, nodeConfig, WORK_DIR.resolve(nodeName)); + + clusterNodes.put(nodeName, fut); + + return fut; } - protected List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) { + protected static List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) { var queryEngine = ((IgniteImpl) node).queryEngine(); SessionId sessionId = queryEngine.createSession(5_000, PropertiesHolder.fromMap( @@ -598,33 +553,4 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { return seenBefore.add(new HashSet<>(s)); // Copy mutable collection. } } - - /** Test info implementation for dynamic tests. */ - static class TestInfoImpl implements TestInfo { - private final String name; - - TestInfoImpl(String name) { - this.name = name; - } - - @Override - public String getDisplayName() { - return name; - } - - @Override - public Set<String> getTags() { - return Set.of(); - } - - @Override - public Optional<Class<?>> getTestClass() { - return Optional.of(ItNodeStartStopTest.class); - } - - @Override - public Optional<Method> getTestMethod() { - return Optional.empty(); - } - } }
