This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-18171 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 7386aafb1828dfb0ac02f110738e1ef690f09df0 Author: amashenkov <[email protected]> AuthorDate: Mon Nov 28 17:09:15 2022 +0300 wip. --- modules/runner/build.gradle | 1 + .../ignite/internal/ItNodeStartStopTest.java | 313 +++++++++++++++++---- 2 files changed, 252 insertions(+), 62 deletions(-) diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle index 1fca5393a7..3cb7eb51d9 100644 --- a/modules/runner/build.gradle +++ b/modules/runner/build.gradle @@ -85,6 +85,7 @@ dependencies { integrationTestImplementation project(':ignite-page-memory') integrationTestImplementation project(':ignite-raft-client') integrationTestImplementation project(':ignite-client') + integrationTestImplementation project(':ignite-cli') integrationTestImplementation(testFixtures(project(':ignite-core'))) integrationTestImplementation(testFixtures(project(':ignite-configuration'))) integrationTestImplementation(testFixtures(project(':ignite-schema'))) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java index 415c6bce1f..377059b8e2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java @@ -17,9 +17,20 @@ package org.apache.ignite.internal; +import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; - +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.micronaut.configuration.picocli.MicronautFactory; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.env.Environment; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; import java.nio.file.Path; import java.util.ArrayList; @@ -29,18 +40,40 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiPredicate; import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; +import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.cli.commands.TopLevelCliCommand; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicaManager; +import org.apache.ignite.internal.sql.engine.QueryContext; +import org.apache.ignite.internal.sql.engine.QueryProperty; +import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; +import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.hamcrest.Matchers; +import org.hamcrest.text.IsEmptyString; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; @@ -50,30 +83,41 @@ import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import picocli.CommandLine; /** * Test node start/stop in different scenarios and validate grid components behavior depending on availability/absence of quorums. */ +//TODO: Fix expected messages in assertThrows +// TODO: Create 2 distribution zones, which spans a single node and both data nodes, and a tables in these zones. @ExtendWith(WorkDirectoryExtension.class) +@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true") public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + public static final int NODE_JOIN_WAIT_TIMEOUT = 5; /** Work directory. */ @WorkDirectory private static Path WORK_DIR; private static final String connectionAddr = "\"localhost:3344\", \"localhost:3345\", \"localhost:3346\""; + /** Correct ignite cluster url. */ + protected static final String NODE_URL = "http://localhost:10300"; + /** Cluster management group node name. */ - private static final String CMG_NODE = "C"; + private static final String CMG_NODE = "node1"; /** MetaStorage group node name. */ - private static final String METASTORAGE_NODE = "M"; - /** Data node 1. */ - private static final String DATA_NODE = "D"; - /** Data node 2. */ - private static final String DATA_NODE_2 = "D2"; + private static final String METASTORAGE_NODE = "node3"; + /** Data node 1 name. */ + private static final String DATA_NODE = "node2"; // Partition leader. + /** Data node 2 name. */ + private static final String DATA_NODE_2 = "node4"; + /** New node name. */ + private static final String NEW_NODE = "newNode"; /** Nodes configurations. */ private static final Map<String, String> nodesCfg = Map.of( - CMG_NODE, "{\n" + "node1", "{\n" + " \"network\": {\n" + " \"port\":3344,\n" + " \"nodeFinder\":{\n" @@ -81,7 +125,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + " }\n" + " }\n" + "}", - METASTORAGE_NODE, "{\n" + "node2", "{\n" + " \"network\": {\n" + " \"port\":3345,\n" + " \"nodeFinder\":{\n" @@ -89,7 +133,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + " }\n" + " }\n" + "}", - DATA_NODE, "{\n" + "node3", "{\n" + " \"network\": {\n" + " \"port\":3346,\n" + " \"nodeFinder\":{\n" @@ -97,7 +141,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + " }\n" + " }\n" + "}", - DATA_NODE_2, "{\n" + "node4", "{\n" + " \"network\": {\n" + " \"port\":3347,\n" + " \"nodeFinder\":{\n" @@ -107,29 +151,61 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { + "}" ); + private static final String NEW_NODE_CONFIG = "{\n" + + " \"network\": {\n" + + " \"port\":3348,\n" + + " \"nodeFinder\":{\n" + + " \"netClusterNodes\": [ " + connectionAddr + " ]\n" + + " }\n" + + " }\n" + + "}"; + + // TODO: Drop labels and make node names meaningful, when distribution zones will be implemented. + // Labels are used for better understanding node role. + // Node names uses for partition affinity calculation, we can't guarantee a node with name "DATA_NODE" will own a partition. + // So, static meaningless node names are used, which are mapped to labels. + private static final Map<String, String> nodeLabels = Map.of( + CMG_NODE, "C", + METASTORAGE_NODE, "M", + DATA_NODE, "D", + DATA_NODE_2, "D2", + NEW_NODE, "N" + ); + /** Cluster nodes. */ private final Map<String, Ignite> clusterNodes = new HashMap<>(); - /** Runs after each test sequence. */ @BeforeEach - public void beforeAll() { - log.info("Init cluster."); - + public void beforeEach() throws Exception { List<CompletableFuture<Ignite>> futures = new ArrayList<>(); + // Start nodes. nodesCfg.forEach((k, v) -> futures.add(IgnitionManager.start(k, v, WORK_DIR.resolve(k)))); - //TODO: Fix metastore group + // Init cluster. + //TODO: Use dedicated node for metastorage. IgnitionManager.init(CMG_NODE, List.of(CMG_NODE /* METASTORAGE_NODE */), List.of(CMG_NODE), "cluster"); - // TODO: Create distribution zones: spans both nodes, spans a single node. - // TODO: Create tables in these distribution zone + add data. - for (CompletableFuture<Ignite> future : futures) { assertThat(future, willCompleteSuccessfully()); } + // Create tables. + IgniteImpl node = (IgniteImpl) futures.get(0).join(); + sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH partitions = 1"); + + for (CompletableFuture<Ignite> f : futures) { + ReplicaManager replicaMgr = (ReplicaManager) MethodHandles.privateLookupIn(IgniteImpl.class, MethodHandles.lookup()) + .findVarHandle(IgniteImpl.class, "replicaMgr", ReplicaManager.class) + .get(f.get()); + + assertTrue(DATA_NODE.equals(f.get().name()) ^ replicaMgr.startedGroups().isEmpty()); + } + + sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)"); + + // Shutdown cluster. for (int i = futures.size() - 1; i >= 0; i--) { IgnitionManager.stop(futures.get(i).join().name()); } @@ -137,11 +213,15 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { /** Runs after each test sequence. */ @AfterEach - public void after() { + public void afterEach() throws IOException { log.info("Stop all nodes."); clusterNodes.keySet().forEach(IgnitionManager::stop); clusterNodes.clear(); + + for (String name : nodesCfg.keySet()) { + IgniteUtils.deleteIfExists(WORK_DIR.resolve(name)); + } } /** Filter out duplicates and invalid grids. */ @@ -168,10 +248,15 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { for (int i = 0; i < nodes.size(); i++) { String nodeName = nodes.get(i); boolean last = (i == nodes.size() - 1); + boolean first = (i == 0); tests.add(createTest( - "Start " + nodeName, - () -> startNode(nodeName), + "Start " + nodeLabels.get(nodeName), + () -> { + assert !first || clusterNodes.isEmpty(); + + startNode(nodeName); + }, this::checkNodeStartupSequence, () -> { if (last) { @@ -181,7 +266,8 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { )); } - return DynamicContainer.dynamicContainer("Start sequence " + String.join("-", nodes), tests); + return DynamicContainer.dynamicContainer("Start sequence " + + nodes.stream().map(nodeLabels::get).collect(Collectors.joining("-")), tests); }); } @@ -192,12 +278,16 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { * @see #checkNodeRestart() () */ @TestFactory - public Stream<DynamicNode> nodeRestartTestFactory() { + public Stream<? extends DynamicNode> nodeRestartTestFactory() { return GridGenerator.generateGrids( nodesCfg.keySet(), nodeFilter() // Data nodes are interchangeable. ).stream() .map(nodes -> { + if (nodes.size() != 3) { + return null; //TODO: remove this + } + ArrayList<DynamicNode> tests = new ArrayList<>(); for (int i = 0; i < nodes.size(); i++) { String nodeName = nodes.get(i); @@ -205,9 +295,11 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { boolean first = i == 0; tests.add(createTest( - "Stop " + nodeName, + "Stop " + nodeLabels.get(nodeName), () -> { if (first) { + assert clusterNodes.isEmpty(); + startNodes(nodes); } stopNode(nodeName); @@ -218,7 +310,7 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { )); tests.add(createTest( - "Start " + nodeName, + "Start " + nodeLabels.get(nodeName), () -> startNode(nodeName), this::checkNodeRestart, () -> { @@ -229,8 +321,10 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { )); } - return DynamicContainer.dynamicContainer("Grid [" + String.join(", ", nodes) + ']', tests); - }); + return DynamicContainer.dynamicContainer("Grid [" + + nodes.stream().map(nodeLabels::get).collect(Collectors.joining(", ")) + ']', tests); + }) + .filter(Objects::nonNull); } public void checkNodeStartupSequence() { @@ -257,81 +351,158 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } private void validateNodeJoin() { - if (!clusterNodes.containsKey(CMG_NODE)) { - //TODO: add node and check it can't join. - return; - } + try { + CompletableFuture<Ignite> fut = IgnitionManager.start(NEW_NODE, NEW_NODE_CONFIG, WORK_DIR.resolve(NEW_NODE)); - if (!clusterNodes.containsKey(METASTORAGE_NODE)) { - //TODO: add node and check it joins, but logical topology is unchanged. - } else { - //TODO: add node and check it joins and updates logical topology. + if (!clusterNodes.containsKey(CMG_NODE)) { + // Only CMG promotes a new node to logical topology + assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, TimeUnit.SECONDS), TimeoutException.class); + + assertTrue(validateTopologyHasNode("physical", NEW_NODE)); + assertThrowsWithCause(() -> validateTopologyHasNode("logical", NEW_NODE), IgniteException.class); + + return; + } + + assertThat(fut, willCompleteSuccessfully()); + + assertTrue(validateTopologyHasNode("physical", ((IgniteImpl) fut.join()).id())); + assertTrue(validateTopologyHasNode("logical", ((IgniteImpl) fut.join()).id())); + } finally { + IgnitionManager.stop(NEW_NODE); } } + private static boolean validateTopologyHasNode(String topologyType, String nodeId) { + StringWriter out = new StringWriter(); + StringWriter err = new StringWriter(); + + new CommandLine(TopLevelCliCommand.class, new MicronautFactory(ApplicationContext.run(Environment.TEST))) + .setDefaultValueProvider(new EnvironmentDefaultValueProvider()) + .setOut(new PrintWriter(out, true)) + .setErr(new PrintWriter(err, true)) + .execute("cluster", "topology", topologyType, "--cluster-endpoint-url", NODE_URL); + + assertThat(err.toString(), IsEmptyString.emptyString()); + + return Pattern.compile("\\b" + nodeId + "\\b").matcher(out.toString()).find(); + } + private void validateDistributionZone() { if (!clusterNodes.containsKey(METASTORAGE_NODE)) { //TODO: creating distribution zone fails. return; } - if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { //TODO: creating distribution zone fails. return; } try { - if (clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { - //TODO: creating distribution zone that spans unavailable node will fails. + if (!clusterNodes.containsKey(DATA_NODE_2)) { //TODO: creating distribution zone that spans on the DATA_NODE only will success. + //TODO: creating distribution zone that spans DATA_NODE and DATA_NODE2 will fail. return; } - - //TODO: creating distribution zone that spans unavailable node will fails. + //TODO: creating distribution zones will success. } finally { - //TODO: drop distribution zone. + //TODO: drop distribution zones. } } private void validateDDL() { - if (!clusterNodes.containsKey(METASTORAGE_NODE)) { - //TODO: create table and check it fails. - return; - } + Ignite node = getNode(); + String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, val INT) WITH partitions = 1"; + String dropTableCommand = "DROP TABLE IF EXISTS tempTbl"; - if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { - //TODO: create table and check it fails because distribution zone must have quorum. - return; + try { + if (!clusterNodes.containsKey(METASTORAGE_NODE)) { + assertThrowsWithCause(() -> sql(node, null, createTableCommand), IgniteException.class); + return; + } + + sql(node, null, createTableCommand); + } finally { + sql(node, null, dropTableCommand); } + } + + public void validateROTransaction() { + Ignite node = getNode(); + Transaction roTx = node.transactions().readOnly().begin(); try { - if (clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { - //TODO: creating table in distribution zone that spans all data nodes will fails. - //TODO: creating table in distribution zone that spans DATA_NODE only will success. + if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { + assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class); + return; + } else if (clusterNodes.containsKey(DATA_NODE)) { + // Use fake transaction with a timestamp from the past. + Transaction tx0 = Mockito.spy(roTx); + Mockito.when(tx0.readTimestamp()).thenReturn(new HybridTimestamp(1L, 0)); + + assertThrowsWithCause(() -> sql(node, tx0, "SELECT * FROM tbl1"), IgniteException.class); + + return; } - //TODO: creating table will success. + assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), IgniteException.class); } finally { - //TODO: drop table. + roTx.rollback(); } } - public void validateROTransaction() { - if (!clusterNodes.containsKey(DATA_NODE) && !clusterNodes.containsKey(DATA_NODE_2)) { - //TODO: table RO transaction will fails. + public void validateImplicitRWTransaction() { + Ignite node = getNode(); + + if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) { + assertThrowsWithCause( + () -> sql(node, null, "INSERT INTO tbl1 VALUES (5, 5)"), + TransactionException.class, + "Failed to get the primary replica"); + return; } - //TODO: table RO transaction will success. + sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)"); + + assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2)); + + sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2"); } public void validateRWTransaction() { - if (clusterNodes.containsKey(DATA_NODE) && clusterNodes.containsKey(DATA_NODE_2)) { - //TODO: table RW transaction will success. - return; + Ignite node = getNode(); + + if (!clusterNodes.containsKey(DATA_NODE) || !clusterNodes.containsKey(DATA_NODE_2)) { + Transaction tx = node.transactions().readOnly().begin(); + try { + assertThrowsWithCause( + () -> sql(node, tx, "INSERT INTO tbl1 VALUES (5, 5)"), + TransactionException.class, + "Failed to get the primary replica"); + + return; + } finally { + tx.rollback(); + } } - //TODO: table RO transaction will fails. + Transaction tx = node.transactions().readOnly().begin(); + try { + sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)"); + + tx.commit(); + + assertThat(sql(node, null, "SELECT * FROM tbl1").size(), Matchers.equalTo(2)); + } finally { + tx.rollback(); + sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2"); + } + } + + /** Get cluster node. */ + private Ignite getNode() { + return clusterNodes.values().iterator().next(); } /** @@ -385,6 +556,24 @@ public class ItNodeStartStopTest extends BaseIgniteAbstractTest { } } + protected List<List<Object>> sql(Ignite node, @Nullable Transaction tx, String sql, Object... args) { + var queryEngine = ((IgniteImpl) node).queryEngine(); + + SessionId sessionId = queryEngine.createSession(15_000, PropertiesHolder.fromMap( + Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC") + )); + + try { + var context = tx != null ? QueryContext.of(tx) : QueryContext.of(); + + return getAllFromCursor( + await(queryEngine.querySingleAsync(sessionId, context, sql, args)) + ); + } finally { + queryEngine.closeSession(sessionId); + } + } + /** * Grids configurations generator. */
