This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new bd26242  IGNITE-13489 Clients log in and out of the topology (#8362)
bd26242 is described below

commit bd2624291a5b1c06e48b37671e0e7f45db605f2a
Author: SwirMix <[email protected]>
AuthorDate: Fri Oct 23 14:19:34 2020 +0300

    IGNITE-13489 Clients log in and out of the topology (#8362)
---
 .../start_stop_client/IgniteCachePutClient.java    |  63 ++++++++
 .../tests/ignitetest/services/ignite_app.py        |  51 ++++---
 .../tests/ignitetest/tests/client_in_out_test.py   | 168 +++++++++++++++++++++
 .../tests/ignitetest/tests/suites/fast_suite.yml   |   4 +
 4 files changed, 268 insertions(+), 18 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java
new file mode 100644
index 0000000..b0ecd63
--- /dev/null
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/start_stop_client/IgniteCachePutClient.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.start_stop_client;
+
+import java.util.Optional;
+import java.util.UUID;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Java client. Tx put operation
+ */
+public class IgniteCachePutClient extends IgniteAwareApplication {
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) throws Exception {
+        String cacheName = jsonNode.get("cacheName").asText();
+
+        long pacing = Optional.ofNullable(jsonNode.get("pacing"))
+                .map(JsonNode::asLong)
+                .orElse(0l);
+
+        log.info("Test props:" +
+                " cacheName=" + cacheName +
+                " pacing=" + pacing);
+
+        IgniteCache<UUID, UUID> cache = ignite.getOrCreateCache(cacheName);
+        log.info("Node name: " + ignite.name() + " starting cache 
operations.");
+
+        markInitialized();
+
+        while (!terminated()) {
+            UUID uuid = UUID.randomUUID();
+
+            long startTime = System.nanoTime();
+
+            cache.put(uuid, uuid);
+
+            long resultTime = System.nanoTime() - startTime;
+
+            log.info("Success put, latency: " + resultTime + "ns.");
+
+            Thread.sleep(pacing);
+        }
+
+        markFinished();
+    }
+}
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py 
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index 8fbf035..fa5cc5d 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -34,9 +34,9 @@ class IgniteApplicationService(IgniteAwareService):
     SERVICE_JAVA_CLASS_NAME = 
"org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"
 
     # pylint: disable=R0913
-    def __init__(self, context, config, java_class_name, params="", 
timeout_sec=60, modules=None,
+    def __init__(self, context, config, java_class_name, num_nodes=1, 
params="", timeout_sec=60, modules=None,
                  servicejava_class_name=SERVICE_JAVA_CLASS_NAME, 
jvm_opts=None, start_ignite=True):
-        super().__init__(context, config, 1, modules=modules, 
servicejava_class_name=servicejava_class_name,
+        super().__init__(context, config, num_nodes, modules=modules, 
servicejava_class_name=servicejava_class_name,
                          java_class_name=java_class_name, params=params, 
jvm_opts=jvm_opts, start_ignite=start_ignite)
 
         self.servicejava_class_name = servicejava_class_name
@@ -55,27 +55,41 @@ class IgniteApplicationService(IgniteAwareService):
 
     def stop_async(self, clean_shutdown=True):
         """
+        Stop in async way.
+        """
+        for node in self.nodes:
+            self.stop_node(node=node, clean_shutdown=clean_shutdown)
+
+    # pylint: disable=W0221
+    def stop_node(self, node, clean_shutdown=True):
+        """
         Stops node in async way.
         """
-        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(self.nodes[0].account)))
-        self.nodes[0].account.kill_java_processes(self.servicejava_class_name, 
clean_shutdown=clean_shutdown,
-                                                  allow_fail=True)
+        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(node.account)))
+        node.account.kill_java_processes(self.servicejava_class_name, 
clean_shutdown=clean_shutdown,
+                                         allow_fail=True)
 
     def await_stopped(self, timeout_sec=10):
         """
         Awaits node stop finish.
         """
-        stopped = self.wait_node(self.nodes[0], timeout_sec=timeout_sec)
-        assert stopped, "Node %s: did not stop within the specified timeout of 
%s seconds" % \
-                        (str(self.nodes[0].account), str(timeout_sec))
+        for node in self.nodes:
+            stopped = self.wait_node(node, timeout_sec=timeout_sec)
+            assert stopped, "Node %s: did not stop within the specified 
timeout of %s seconds" % \
+                            (str(node.account), str(timeout_sec))
 
         self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec)
 
     # pylint: disable=W0221
-    def stop_node(self, node, clean_shutdown=True, timeout_sec=10):
-        assert node == self.nodes[0]
-        self.stop_async(clean_shutdown)
-        self.await_stopped(timeout_sec)
+    def stop(self, clean_shutdown=True, timeout_sec=10):
+        """
+        Stop services.
+        """
+        if clean_shutdown:
+            self.stop_async(clean_shutdown)
+            self.await_stopped(timeout_sec)
+        else:
+            self.stop_async(clean_shutdown)
 
     def __check_status(self, desired, timeout=1):
         self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, 
from_the_beginning=True)
@@ -110,7 +124,8 @@ class IgniteApplicationService(IgniteAwareService):
         """
         results = self.extract_results(name)
 
-        assert len(results) <= 1, f"Expected exactly one result occurence, 
{len(results)} found."
+        assert len(results) == len(self.nodes), f"Expected exactly 
{len(self.nodes)} occurence," \
+                                                f" but found {len(results)}."
 
         return results[0] if results else ""
 
@@ -121,10 +136,10 @@ class IgniteApplicationService(IgniteAwareService):
         """
         res = []
 
-        output = self.nodes[0].account.ssh_capture(
-            "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), 
allow_fail=False)
-
-        for line in output:
-            res.append(re.search("%s(.*)%s" % (name + "->", "<-"), 
line).group(1))
+        for node in self.nodes:
+            output = node.account.ssh_capture(
+                "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), 
allow_fail=False)
+            for line in output:
+                res.append(re.search("%s(.*)%s" % (name + "->", "<-"), 
line).group(1))
 
         return res
diff --git a/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py 
b/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py
new file mode 100644
index 0000000..aab699a
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/client_in_out_test.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains client tests
+"""
+import time
+
+from ducktape.mark.resource import cluster
+
+from ducktape.mark import parametrize
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.utils import ignite_versions
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import DEV_BRANCH, V_2_8_1, IgniteVersion
+
+
+# pylint: disable=W0223
+class ClientTest(IgniteTest):
+    """
+    cluster - cluster size
+    CACHE_NAME - name of the cache to create for the test.
+    PACING - the frequency of the operation on clients (ms).
+    JAVA_CLIENT_CLASS_NAME - running classname.
+    client_work_time - clients working time (s).
+    iteration_count - the number of iterations of starting and stopping client 
nodes (s).
+    static_clients - the number of permanently employed clients.
+    temp_client - number of clients who come log in and out.
+    """
+
+    CACHE_NAME = "simple-tx-cache"
+    PACING = 10
+    JAVA_CLIENT_CLASS_NAME = 
"org.apache.ignite.internal.ducktest.tests.start_stop_client.IgniteCachePutClient"
+
+    @ignite_versions(str(V_2_8_1), str(DEV_BRANCH))
+    @cluster(num_nodes=7)
+    @parametrize(num_nodes=7,
+                 static_clients=2,
+                 temp_client=3,
+                 iteration_count=3,
+                 client_work_time=30)
+    # pylint: disable=R0913
+    def test_ignite_start_stop_nodes(self, ignite_version,
+                                     num_nodes, static_clients, temp_client, 
iteration_count, client_work_time):
+        """
+        Start and stop clients node test without kill java process.
+        Check topology.
+        """
+        self.ignite_start_stop(ignite_version, True, num_nodes, static_clients,
+                               temp_client, iteration_count, client_work_time)
+
+    @ignite_versions(str(V_2_8_1), str(DEV_BRANCH))
+    @cluster(num_nodes=7)
+    @parametrize(num_nodes=7,
+                 static_clients=2,
+                 temp_client=3,
+                 iteration_count=3,
+                 client_work_time=30)
+    # pylint: disable=R0913
+    def test_ignite_kill_start_nodes(self, ignite_version,
+                                     num_nodes, static_clients, temp_client, 
iteration_count, client_work_time):
+        """
+        Start and kill client nodes, Check topology
+        """
+        self.ignite_start_stop(ignite_version, False, num_nodes, 
static_clients,
+                               temp_client, iteration_count, client_work_time)
+
+    # pylint: disable=R0914
+    # pylint: disable=R0913
+    def ignite_start_stop(self, ignite_version, correct_stop_temp_node,
+                          nodes_num, static_clients_num, temp_client, 
iteration_count, client_work_time):
+        """
+        Test for starting and stopping fat clients.
+        """
+
+        servers_count = nodes_num - static_clients_num - temp_client
+
+        current_top_v = servers_count
+        # Topology version after test.
+        fin_top_ver = servers_count + (2 * static_clients_num) + (2 * 
iteration_count * temp_client)
+
+        server_cfg = IgniteConfiguration(
+            version=IgniteVersion(ignite_version),
+            caches=[CacheConfiguration(name=self.CACHE_NAME, backups=1, 
atomicity_mode='TRANSACTIONAL')]
+        )
+
+        ignite = IgniteService(self.test_context, server_cfg, 
num_nodes=servers_count)
+        control_utility = ControlUtility(ignite, self.test_context)
+
+        client_cfg = server_cfg._replace(client_mode=True)
+
+        static_clients = IgniteApplicationService(
+            self.test_context,
+            client_cfg,
+            java_class_name=self.JAVA_CLIENT_CLASS_NAME,
+            num_nodes=static_clients_num,
+            params={"cacheName": self.CACHE_NAME,
+                    "pacing": self.PACING})
+
+        temp_clients = IgniteApplicationService(
+            self.test_context,
+            client_cfg,
+            java_class_name=self.JAVA_CLIENT_CLASS_NAME,
+            num_nodes=temp_client,
+            params={"cacheName": self.CACHE_NAME,
+                    "pacing": self.PACING})
+
+        ignite.start()
+
+        static_clients.start()
+
+        current_top_v += static_clients_num
+        check_topology(control_utility, current_top_v)
+
+        # Start / stop temp_clients node. Check cluster.
+        for i in range(iteration_count):
+            self.logger.debug(f'Starting iteration: {i}.')
+
+            temp_clients.start()
+            current_top_v += temp_client
+
+            await_event(static_clients, f'ver={current_top_v}, locNode=')
+            check_topology(control_utility, current_top_v)
+
+            await_event(temp_clients, f'clients={static_clients_num + 
temp_client}')
+
+            time.sleep(client_work_time)
+            temp_clients.stop(correct_stop_temp_node)
+
+            current_top_v += temp_client
+
+        await_event(static_clients, f'ver={current_top_v}, locNode=')
+        static_clients.stop()
+
+        check_topology(control_utility, fin_top_ver)
+
+
+def await_event(service: IgniteApplicationService, message):
+    """
+    :param service: target service for wait
+    :param message: message
+    """
+    service.await_event(message, timeout_sec=80, from_the_beginning=True)
+
+
+def check_topology(control_utility: ControlUtility, fin_top_ver: int):
+    """
+    Check current topology version.
+    """
+    top_ver = control_utility.cluster_state().topology_version
+    assert top_ver == fin_top_ver, f'Cluster current topology 
version={top_ver}, ' \
+                                   f'expected topology version={fin_top_ver}.'
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml 
b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
index 698c1d8..714a9af 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
@@ -27,3 +27,7 @@ cellular_affinity:
 
 rebalance:
   - ../add_node_rebalance_test.py
+
+clients:
+  - ../client_in_out_test.py
+

Reply via email to