This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-18171
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 7386aafb1828dfb0ac02f110738e1ef690f09df0
Author: amashenkov <[email protected]>
AuthorDate: Mon Nov 28 17:09:15 2022 +0300

    wip.
---
 modules/runner/build.gradle                        |   1 +
 .../ignite/internal/ItNodeStartStopTest.java       | 313 +++++++++++++++++----
 2 files changed, 252 insertions(+), 62 deletions(-)

diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 1fca5393a7..3cb7eb51d9 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -85,6 +85,7 @@ dependencies {
     integrationTestImplementation project(':ignite-page-memory')
     integrationTestImplementation project(':ignite-raft-client')
     integrationTestImplementation project(':ignite-client')
+    integrationTestImplementation project(':ignite-cli')
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
     integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java
index 415c6bce1f..377059b8e2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItNodeStartStopTest.java
@@ -17,9 +17,20 @@
 
 package org.apache.ignite.internal;
 
+import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
-
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.micronaut.configuration.picocli.MicronautFactory;
+import io.micronaut.context.ApplicationContext;
+import io.micronaut.context.env.Environment;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -29,18 +40,40 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.EnvironmentDefaultValueProvider;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.cli.commands.TopLevelCliCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.hamcrest.Matchers;
+import org.hamcrest.text.IsEmptyString;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.BeforeEach;
@@ -50,30 +83,41 @@ import org.junit.jupiter.api.DynamicTest;
 import org.junit.jupiter.api.TestFactory;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+import picocli.CommandLine;
 
 /**
  * Test node start/stop in different scenarios and validate grid components 
behavior depending on availability/absence of quorums.
  */
+//TODO: Fix expected messages in assertThrows
+// TODO: Create 2 distribution zones, which spans a single node and both data 
nodes, and a tables in these zones.
 @ExtendWith(WorkDirectoryExtension.class)
+@WithSystemProperty(key = "org.jline.terminal.dumb", value = "true")
 public class ItNodeStartStopTest extends BaseIgniteAbstractTest {
+    public static final int NODE_JOIN_WAIT_TIMEOUT = 5;
     /** Work directory. */
     @WorkDirectory
     private static Path WORK_DIR;
 
     private static final String connectionAddr = "\"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\"";
 
+    /** Correct ignite cluster url. */
+    protected static final String NODE_URL = "http://localhost:10300";;
+
     /** Cluster management group node name. */
-    private static final String CMG_NODE = "C";
+    private static final String CMG_NODE = "node1";
     /** MetaStorage group node name. */
-    private static final String METASTORAGE_NODE = "M";
-    /** Data node 1. */
-    private static final String DATA_NODE = "D";
-    /** Data node 2. */
-    private static final String DATA_NODE_2 = "D2";
+    private static final String METASTORAGE_NODE = "node3";
+    /** Data node 1 name. */
+    private static final String DATA_NODE = "node2"; // Partition leader.
+    /** Data node 2 name. */
+    private static final String DATA_NODE_2 = "node4";
+    /** New node name. */
+    private static final String NEW_NODE = "newNode";
 
     /** Nodes configurations. */
     private static final Map<String, String> nodesCfg = Map.of(
-            CMG_NODE, "{\n"
+            "node1", "{\n"
                     + "  \"network\": {\n"
                     + "    \"port\":3344,\n"
                     + "    \"nodeFinder\":{\n"
@@ -81,7 +125,7 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                     + "    }\n"
                     + "  }\n"
                     + "}",
-            METASTORAGE_NODE, "{\n"
+            "node2", "{\n"
                     + "  \"network\": {\n"
                     + "    \"port\":3345,\n"
                     + "    \"nodeFinder\":{\n"
@@ -89,7 +133,7 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                     + "    }\n"
                     + "  }\n"
                     + "}",
-            DATA_NODE, "{\n"
+            "node3", "{\n"
                     + "  \"network\": {\n"
                     + "    \"port\":3346,\n"
                     + "    \"nodeFinder\":{\n"
@@ -97,7 +141,7 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                     + "    }\n"
                     + "  }\n"
                     + "}",
-            DATA_NODE_2, "{\n"
+            "node4", "{\n"
                     + "  \"network\": {\n"
                     + "    \"port\":3347,\n"
                     + "    \"nodeFinder\":{\n"
@@ -107,29 +151,61 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                     + "}"
     );
 
+    private static final String NEW_NODE_CONFIG = "{\n"
+            + "  \"network\": {\n"
+            + "    \"port\":3348,\n"
+            + "    \"nodeFinder\":{\n"
+            + "      \"netClusterNodes\": [ " + connectionAddr + " ]\n"
+            + "    }\n"
+            + "  }\n"
+            + "}";
+
+    // TODO: Drop labels and make node names meaningful, when distribution 
zones will be implemented.
+    // Labels are used for better understanding node role.
+    // Node names uses for partition affinity calculation, we can't guarantee 
a node with name "DATA_NODE" will own a partition.
+    // So, static meaningless node names are used, which are mapped to labels.
+    private static final Map<String, String> nodeLabels = Map.of(
+            CMG_NODE, "C",
+            METASTORAGE_NODE, "M",
+            DATA_NODE, "D",
+            DATA_NODE_2, "D2",
+            NEW_NODE, "N"
+    );
+
     /** Cluster nodes. */
     private final Map<String, Ignite> clusterNodes = new HashMap<>();
 
-
     /** Runs after each test sequence. */
     @BeforeEach
-    public void beforeAll() {
-        log.info("Init cluster.");
-
+    public void beforeEach() throws Exception {
         List<CompletableFuture<Ignite>> futures = new ArrayList<>();
 
+        // Start nodes.
         nodesCfg.forEach((k, v) -> futures.add(IgnitionManager.start(k, v, 
WORK_DIR.resolve(k))));
 
-        //TODO: Fix metastore group
+        // Init cluster.
+        //TODO: Use dedicated node for metastorage.
         IgnitionManager.init(CMG_NODE, List.of(CMG_NODE /* METASTORAGE_NODE 
*/), List.of(CMG_NODE), "cluster");
 
-        // TODO: Create distribution zones: spans both nodes, spans a single 
node.
-        // TODO: Create tables in these distribution zone + add data.
-
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
         }
 
+        // Create tables.
+        IgniteImpl node = (IgniteImpl) futures.get(0).join();
+        sql(node, null, "CREATE TABLE tbl1 (id INT PRIMARY KEY, val INT) WITH 
partitions = 1");
+
+        for (CompletableFuture<Ignite> f : futures) {
+            ReplicaManager replicaMgr = (ReplicaManager) 
MethodHandles.privateLookupIn(IgniteImpl.class, MethodHandles.lookup())
+                    .findVarHandle(IgniteImpl.class, "replicaMgr", 
ReplicaManager.class)
+                    .get(f.get());
+
+            assertTrue(DATA_NODE.equals(f.get().name()) ^ 
replicaMgr.startedGroups().isEmpty());
+        }
+
+        sql(node, null, "INSERT INTO tbl1(id, val) VALUES (1,1)");
+
+        // Shutdown cluster.
         for (int i = futures.size() - 1; i >= 0; i--) {
             IgnitionManager.stop(futures.get(i).join().name());
         }
@@ -137,11 +213,15 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
 
     /** Runs after each test sequence. */
     @AfterEach
-    public void after() {
+    public void afterEach() throws IOException {
         log.info("Stop all nodes.");
 
         clusterNodes.keySet().forEach(IgnitionManager::stop);
         clusterNodes.clear();
+
+        for (String name : nodesCfg.keySet()) {
+            IgniteUtils.deleteIfExists(WORK_DIR.resolve(name));
+        }
     }
 
     /** Filter out duplicates and invalid grids. */
@@ -168,10 +248,15 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                     for (int i = 0; i < nodes.size(); i++) {
                         String nodeName = nodes.get(i);
                         boolean last = (i == nodes.size() - 1);
+                        boolean first = (i == 0);
 
                         tests.add(createTest(
-                                "Start " + nodeName,
-                                () -> startNode(nodeName),
+                                "Start " + nodeLabels.get(nodeName),
+                                () -> {
+                                    assert !first || clusterNodes.isEmpty();
+
+                                    startNode(nodeName);
+                                },
                                 this::checkNodeStartupSequence,
                                 () -> {
                                     if (last) {
@@ -181,7 +266,8 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                         ));
                     }
 
-                    return DynamicContainer.dynamicContainer("Start sequence " 
+ String.join("-", nodes), tests);
+                    return DynamicContainer.dynamicContainer("Start sequence " 
+
+                            
nodes.stream().map(nodeLabels::get).collect(Collectors.joining("-")), tests);
                 });
     }
 
@@ -192,12 +278,16 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
      * @see #checkNodeRestart() ()
      */
     @TestFactory
-    public Stream<DynamicNode> nodeRestartTestFactory() {
+    public Stream<? extends DynamicNode> nodeRestartTestFactory() {
         return GridGenerator.generateGrids(
                         nodesCfg.keySet(),
                         nodeFilter() // Data nodes are interchangeable.
                 ).stream()
                 .map(nodes -> {
+                    if (nodes.size() != 3) {
+                        return null; //TODO: remove this
+                    }
+
                     ArrayList<DynamicNode> tests = new ArrayList<>();
                     for (int i = 0; i < nodes.size(); i++) {
                         String nodeName = nodes.get(i);
@@ -205,9 +295,11 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                         boolean first = i == 0;
 
                         tests.add(createTest(
-                                "Stop " + nodeName,
+                                "Stop " + nodeLabels.get(nodeName),
                                 () -> {
                                     if (first) {
+                                        assert clusterNodes.isEmpty();
+
                                         startNodes(nodes);
                                     }
                                     stopNode(nodeName);
@@ -218,7 +310,7 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                         ));
 
                         tests.add(createTest(
-                                "Start " + nodeName,
+                                "Start " + nodeLabels.get(nodeName),
                                 () -> startNode(nodeName),
                                 this::checkNodeRestart,
                                 () -> {
@@ -229,8 +321,10 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
                         ));
                     }
 
-                    return DynamicContainer.dynamicContainer("Grid [" + 
String.join(", ", nodes) + ']', tests);
-                });
+                    return DynamicContainer.dynamicContainer("Grid [" +
+                            
nodes.stream().map(nodeLabels::get).collect(Collectors.joining(", ")) + ']', 
tests);
+                })
+                .filter(Objects::nonNull);
     }
 
     public void checkNodeStartupSequence() {
@@ -257,81 +351,158 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
     }
 
     private void validateNodeJoin() {
-        if (!clusterNodes.containsKey(CMG_NODE)) {
-            //TODO: add node and check it can't join.
-            return;
-        }
+        try {
+            CompletableFuture<Ignite> fut = IgnitionManager.start(NEW_NODE, 
NEW_NODE_CONFIG, WORK_DIR.resolve(NEW_NODE));
 
-        if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
-            //TODO: add node and check it joins, but logical topology is 
unchanged.
-        } else {
-            //TODO: add node and check it joins and updates logical topology.
+            if (!clusterNodes.containsKey(CMG_NODE)) {
+                // Only CMG promotes a new node to logical topology
+                assertThrowsWithCause(() -> fut.get(NODE_JOIN_WAIT_TIMEOUT, 
TimeUnit.SECONDS), TimeoutException.class);
+
+                assertTrue(validateTopologyHasNode("physical", NEW_NODE));
+                assertThrowsWithCause(() -> validateTopologyHasNode("logical", 
NEW_NODE), IgniteException.class);
+
+                return;
+            }
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertTrue(validateTopologyHasNode("physical", ((IgniteImpl) 
fut.join()).id()));
+            assertTrue(validateTopologyHasNode("logical", ((IgniteImpl) 
fut.join()).id()));
+        } finally {
+            IgnitionManager.stop(NEW_NODE);
         }
     }
 
+    private static boolean validateTopologyHasNode(String topologyType, String 
nodeId) {
+        StringWriter out = new StringWriter();
+        StringWriter err = new StringWriter();
+
+        new CommandLine(TopLevelCliCommand.class, new 
MicronautFactory(ApplicationContext.run(Environment.TEST)))
+                .setDefaultValueProvider(new EnvironmentDefaultValueProvider())
+                .setOut(new PrintWriter(out, true))
+                .setErr(new PrintWriter(err, true))
+                .execute("cluster", "topology", topologyType, 
"--cluster-endpoint-url", NODE_URL);
+
+        assertThat(err.toString(), IsEmptyString.emptyString());
+
+        return Pattern.compile("\\b" + nodeId + 
"\\b").matcher(out.toString()).find();
+    }
+
     private void validateDistributionZone() {
         if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
             //TODO: creating distribution zone fails.
             return;
         }
-
         if (!clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
             //TODO: creating distribution zone fails.
             return;
         }
 
         try {
-            if (clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
-                //TODO: creating distribution zone that spans unavailable node 
will fails.
+            if (!clusterNodes.containsKey(DATA_NODE_2)) {
                 //TODO: creating distribution zone that spans on the DATA_NODE 
only will success.
+                //TODO: creating distribution zone that spans DATA_NODE and 
DATA_NODE2 will fail.
                 return;
             }
-
-            //TODO: creating distribution zone that spans unavailable node 
will fails.
+            //TODO: creating distribution zones will success.
         } finally {
-            //TODO: drop distribution zone.
+            //TODO: drop distribution zones.
         }
     }
 
     private void validateDDL() {
-        if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
-            //TODO: create table and check it fails.
-            return;
-        }
+        Ignite node = getNode();
+        String createTableCommand = "CREATE TABLE tempTbl (id INT PRIMARY KEY, 
val INT) WITH partitions = 1";
+        String dropTableCommand = "DROP TABLE IF EXISTS tempTbl";
 
-        if (!clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
-            //TODO: create table and check it fails because distribution zone 
must have quorum.
-            return;
+        try {
+            if (!clusterNodes.containsKey(METASTORAGE_NODE)) {
+                assertThrowsWithCause(() -> sql(node, null, 
createTableCommand), IgniteException.class);
+                return;
+            }
+
+            sql(node, null, createTableCommand);
+        } finally {
+            sql(node, null, dropTableCommand);
         }
+    }
+
+    public void validateROTransaction() {
+        Ignite node = getNode();
+        Transaction roTx = node.transactions().readOnly().begin();
 
         try {
-            if (clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
-                //TODO: creating table in distribution zone that spans all 
data nodes will fails.
-                //TODO: creating table in distribution zone that spans 
DATA_NODE only will success.
+            if (!clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
+                assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM 
tbl1"), IgniteException.class);
+                return;
+            } else if (clusterNodes.containsKey(DATA_NODE)) {
+                // Use fake transaction with a timestamp from the past.
+                Transaction tx0 = Mockito.spy(roTx);
+                Mockito.when(tx0.readTimestamp()).thenReturn(new 
HybridTimestamp(1L, 0));
+
+                assertThrowsWithCause(() -> sql(node, tx0, "SELECT * FROM 
tbl1"), IgniteException.class);
+
+                return;
             }
 
-            //TODO: creating table will success.
+            assertThrowsWithCause(() -> sql(node, roTx, "SELECT * FROM tbl1"), 
IgniteException.class);
         } finally {
-            //TODO: drop table.
+            roTx.rollback();
         }
     }
 
-    public void validateROTransaction() {
-        if (!clusterNodes.containsKey(DATA_NODE) && 
!clusterNodes.containsKey(DATA_NODE_2)) {
-            //TODO: table RO transaction will fails.
+    public void validateImplicitRWTransaction() {
+        Ignite node = getNode();
+
+        if (!clusterNodes.containsKey(DATA_NODE) || 
!clusterNodes.containsKey(DATA_NODE_2)) {
+            assertThrowsWithCause(
+                    () -> sql(node, null, "INSERT INTO tbl1 VALUES (5, 5)"),
+                    TransactionException.class,
+                    "Failed to get the primary replica");
+
             return;
         }
 
-        //TODO: table RO transaction will success.
+        sql(node, null, "INSERT INTO tbl1 VALUES (2, 2)");
+
+        assertThat(sql(node, null, "SELECT * FROM tbl1").size(), 
Matchers.equalTo(2));
+
+        sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
     }
 
     public void validateRWTransaction() {
-        if (clusterNodes.containsKey(DATA_NODE) && 
clusterNodes.containsKey(DATA_NODE_2)) {
-            //TODO: table RW transaction will success.
-            return;
+        Ignite node = getNode();
+
+        if (!clusterNodes.containsKey(DATA_NODE) || 
!clusterNodes.containsKey(DATA_NODE_2)) {
+            Transaction tx = node.transactions().readOnly().begin();
+            try {
+                assertThrowsWithCause(
+                        () -> sql(node, tx, "INSERT INTO tbl1 VALUES (5, 5)"),
+                        TransactionException.class,
+                        "Failed to get the primary replica");
+
+                return;
+            } finally {
+                tx.rollback();
+            }
         }
 
-        //TODO: table RO transaction will fails.
+        Transaction tx = node.transactions().readOnly().begin();
+        try {
+            sql(node, tx, "INSERT INTO tbl1 VALUES (2, 2)");
+
+            tx.commit();
+
+            assertThat(sql(node, null, "SELECT * FROM tbl1").size(), 
Matchers.equalTo(2));
+        } finally {
+            tx.rollback();
+            sql(node, null, "DELETE FROM tbl1 WHERE tbl1.id = 2");
+        }
+    }
+
+    /** Get cluster node. */
+    private Ignite getNode() {
+        return clusterNodes.values().iterator().next();
     }
 
     /**
@@ -385,6 +556,24 @@ public class ItNodeStartStopTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    protected List<List<Object>> sql(Ignite node, @Nullable Transaction tx, 
String sql, Object... args) {
+        var queryEngine = ((IgniteImpl) node).queryEngine();
+
+        SessionId sessionId = queryEngine.createSession(15_000, 
PropertiesHolder.fromMap(
+                Map.of(QueryProperty.DEFAULT_SCHEMA, "PUBLIC")
+        ));
+
+        try {
+            var context = tx != null ? QueryContext.of(tx) : QueryContext.of();
+
+            return getAllFromCursor(
+                    await(queryEngine.querySingleAsync(sessionId, context, 
sql, args))
+            );
+        } finally {
+            queryEngine.closeSession(sessionId);
+        }
+    }
+
     /**
      * Grids configurations generator.
      */

Reply via email to