This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 41606e9  IGNITE-13703 Check difference between TcpDiscovery and 
ZookeeperDiscovery at Cellular switch (#8454)
41606e9 is described below

commit 41606e95859fdaa098931e0e2ce874ffb3f17a60
Author: Anton Vinogradov <[email protected]>
AuthorDate: Wed Nov 25 15:31:50 2020 +0300

    IGNITE-13703 Check difference between TcpDiscovery and ZookeeperDiscovery 
at Cellular switch (#8454)
---
 .../SingleKeyTxStreamerApplication.java            |   2 +-
 .../ducktests/tests/ignitetest/services/ignite.py  |  42 +-----
 .../tests/ignitetest/services/ignite_app.py        |  60 ++------
 .../ducktests/tests/ignitetest/services/spark.py   |  10 +-
 .../ignitetest/services/utils/ignite_aware.py      |  86 ++++++++++-
 .../tests/ignitetest/services/utils/log_utils.py   |  42 ++++++
 .../tests/ignitetest/services/zk/zookeeper.py      |  11 +-
 .../ignitetest/tests/add_node_rebalance_test.py    |   2 +-
 .../ignitetest/tests/cellular_affinity_test.py     | 166 ++++++++++++++-------
 .../tests/ignitetest/tests/client_test.py          |   7 +-
 .../tests/control_utility/baseline_test.py         |   4 +-
 .../ignitetest/tests/control_utility/tx_test.py    |   4 +-
 .../tests/ignitetest/tests/pme_free_switch_test.py |  42 ++++--
 .../ducktests/tests/ignitetest/tests/self_test.py  |   4 +-
 .../tests/ignitetest/utils/ignite_test.py          |  15 ++
 15 files changed, 327 insertions(+), 170 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
index bb91df8..c22fd37 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
@@ -61,7 +61,7 @@ public class SingleKeyTxStreamerApplication extends 
IgniteAwareApplication {
                     maxLatency = latency;
             }
 
-            if (cnt % 1000 == 0)
+            if (cnt % 100 == 0)
                 log.info("APPLICATION_STREAMED " + cnt + " transactions [max=" 
+ maxLatency + "]");
         }
 
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py 
b/modules/ducktests/tests/ignitetest/services/ignite.py
index a0158ef..e5ec3fa 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -23,7 +23,6 @@ import signal
 from datetime import datetime
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.utils.util import wait_until
 
 from ignitetest.services.utils.ignite_aware import IgniteAwareService
 
@@ -36,43 +35,10 @@ class IgniteService(IgniteAwareService):
     HEAP_DUMP_FILE = os.path.join(IgniteAwareService.PERSISTENT_ROOT, 
"ignite-heap.bin")
 
     # pylint: disable=R0913
-    def __init__(self, context, config, num_nodes, jvm_opts=None, 
modules=None):
-        super().__init__(context, config, num_nodes, modules=modules, 
jvm_opts=jvm_opts)
-
-    # pylint: disable=W0221
-    def start(self, timeout_sec=180):
-        super().start()
-
-        self.logger.info("Waiting for Ignite(s) to start...")
-
-        for node in self.nodes:
-            self.await_node_started(node, timeout_sec)
-
-    def await_node_started(self, node, timeout_sec):
-        """
-        Await topology ready event on node start.
-        :param node: Node to wait
-        :param timeout_sec: Number of seconds to wait event.
-        """
-        self.await_event_on_node("Topology snapshot", node, timeout_sec, 
from_the_beginning=True)
-
-        if len(self.pids(node)) == 0:
-            raise Exception("No process ids recorded on node %s" % 
node.account.hostname)
-
-    # pylint: disable=W0221
-    def stop_node(self, node, clean_shutdown=True, timeout_sec=60):
-        pids = self.pids(node)
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
-
-        try:
-            wait_until(lambda: len(self.pids(node)) == 0, 
timeout_sec=timeout_sec,
-                       err_msg="Ignite node failed to stop in %d seconds" % 
timeout_sec)
-        except Exception:
-            self.thread_dump(node)
-            raise
+    def __init__(self, context, config, num_nodes, jvm_opts=None, 
startup_timeout_sec=60, shutdown_timeout_sec=10,
+                 modules=None):
+        super().__init__(context, config, num_nodes, startup_timeout_sec, 
shutdown_timeout_sec, modules=modules,
+                         jvm_opts=jvm_opts)
 
     def clean_node(self, node):
         node.account.kill_java_processes(self.APP_SERVICE_CLASS, 
clean_shutdown=False, allow_fail=True)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py 
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index fa5cc5d..bb93725 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -34,62 +34,26 @@ class IgniteApplicationService(IgniteAwareService):
     SERVICE_JAVA_CLASS_NAME = 
"org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"
 
     # pylint: disable=R0913
-    def __init__(self, context, config, java_class_name, num_nodes=1, 
params="", timeout_sec=60, modules=None,
-                 servicejava_class_name=SERVICE_JAVA_CLASS_NAME, 
jvm_opts=None, start_ignite=True):
-        super().__init__(context, config, num_nodes, modules=modules, 
servicejava_class_name=servicejava_class_name,
-                         java_class_name=java_class_name, params=params, 
jvm_opts=jvm_opts, start_ignite=start_ignite)
+    def __init__(self, context, config, java_class_name, num_nodes=1, 
params="", startup_timeout_sec=60,
+                 shutdown_timeout_sec=10, modules=None, 
servicejava_class_name=SERVICE_JAVA_CLASS_NAME, jvm_opts=None,
+                 start_ignite=True):
+        super().__init__(context, config, num_nodes, startup_timeout_sec, 
shutdown_timeout_sec, modules=modules,
+                         servicejava_class_name=servicejava_class_name, 
java_class_name=java_class_name, params=params,
+                         jvm_opts=jvm_opts, start_ignite=start_ignite)
 
         self.servicejava_class_name = servicejava_class_name
         self.java_class_name = java_class_name
-        self.timeout_sec = timeout_sec
         self.params = params
 
-    def start(self):
-        super().start()
+    def await_started(self):
+        super().await_started()
 
-        self.logger.info("Waiting for Ignite aware Application (%s) to 
start..." % self.java_class_name)
+        self.__check_status("IGNITE_APPLICATION_INITIALIZED", 
timeout=self.startup_timeout_sec)
 
-        self.await_event("Topology snapshot", self.timeout_sec, 
from_the_beginning=True)
+    def await_stopped(self):
+        super().await_stopped()
 
-        self.__check_status("IGNITE_APPLICATION_INITIALIZED", 
timeout=self.timeout_sec)
-
-    def stop_async(self, clean_shutdown=True):
-        """
-        Stop in async way.
-        """
-        for node in self.nodes:
-            self.stop_node(node=node, clean_shutdown=clean_shutdown)
-
-    # pylint: disable=W0221
-    def stop_node(self, node, clean_shutdown=True):
-        """
-        Stops node in async way.
-        """
-        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(node.account)))
-        node.account.kill_java_processes(self.servicejava_class_name, 
clean_shutdown=clean_shutdown,
-                                         allow_fail=True)
-
-    def await_stopped(self, timeout_sec=10):
-        """
-        Awaits node stop finish.
-        """
-        for node in self.nodes:
-            stopped = self.wait_node(node, timeout_sec=timeout_sec)
-            assert stopped, "Node %s: did not stop within the specified 
timeout of %s seconds" % \
-                            (str(node.account), str(timeout_sec))
-
-        self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec)
-
-    # pylint: disable=W0221
-    def stop(self, clean_shutdown=True, timeout_sec=10):
-        """
-        Stop services.
-        """
-        if clean_shutdown:
-            self.stop_async(clean_shutdown)
-            self.await_stopped(timeout_sec)
-        else:
-            self.stop_async(clean_shutdown)
+        self.__check_status("IGNITE_APPLICATION_FINISHED")
 
     def __check_status(self, desired, timeout=1):
         self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, 
from_the_beginning=True)
diff --git a/modules/ducktests/tests/ignitetest/services/spark.py 
b/modules/ducktests/tests/ignitetest/services/spark.py
index 16d90e1..77756eb 100644
--- a/modules/ducktests/tests/ignitetest/services/spark.py
+++ b/modules/ducktests/tests/ignitetest/services/spark.py
@@ -21,7 +21,9 @@ import os.path
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.services.background_thread import BackgroundThreadService
+
 from ignitetest.services.utils.ignite_persistence import PersistenceAware
+from ignitetest.services.utils.log_utils import monitor_log
 
 
 class SparkService(BackgroundThreadService, PersistenceAware):
@@ -91,7 +93,7 @@ class SparkService(BackgroundThreadService, PersistenceAware):
         self.logger.debug("Monitoring - %s" % log_file)
 
         timeout_sec = 30
-        with node.account.monitor_log(log_file) as monitor:
+        with monitor_log(node, log_file) as monitor:
             node.account.ssh(cmd)
             monitor.wait_until(log_msg, timeout_sec=timeout_sec, backoff_sec=5,
                                err_msg="Spark doesn't start at %d seconds" % 
timeout_sec)
@@ -156,3 +158,9 @@ class SparkService(BackgroundThreadService, 
PersistenceAware):
             userID=node.account.user,
             instance=1,
             host=node.account.hostname)
+
+    def kill(self):
+        """
+        Kills the service.
+        """
+        self.stop()
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 5045e98..513e0e5 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -17,6 +17,7 @@
 This module contains the base class to build services aware of Ignite.
 """
 import os
+import signal
 import socket
 import sys
 import time
@@ -31,6 +32,7 @@ from ignitetest.services.utils.concurrent import 
CountDownLatch, AtomicValue
 from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware
 from ignitetest.services.utils.ignite_spec import resolve_spec
 from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin
+from ignitetest.services.utils.log_utils import monitor_log
 
 
 class IgniteAwareService(BackgroundThreadService, IgnitePersistenceAware, 
metaclass=ABCMeta):
@@ -41,7 +43,7 @@ class IgniteAwareService(BackgroundThreadService, 
IgnitePersistenceAware, metacl
     NETFILTER_STORE_PATH = os.path.join(IgnitePersistenceAware.TEMP_DIR, 
"iptables.bak")
 
     # pylint: disable=R0913
-    def __init__(self, context, config, num_nodes, **kwargs):
+    def __init__(self, context, config, num_nodes, startup_timeout_sec, 
shutdown_timeout_sec, **kwargs):
         """
         **kwargs are params that passed to IgniteSpec
         """
@@ -52,20 +54,95 @@ class IgniteAwareService(BackgroundThreadService, 
IgnitePersistenceAware, metacl
         self.log_level = "DEBUG"
 
         self.config = config
+        self.startup_timeout_sec = startup_timeout_sec
+        self.shutdown_timeout_sec = shutdown_timeout_sec
 
         self.spec = resolve_spec(self, context, config, **kwargs)
 
         self.disconnected_nodes = []
+        self.killed = False
+
+    def start_async(self):
+        """
+        Starts in async way.
+        """
+        super().start()
+
+    def start(self):
+        self.start_async()
+        self.await_started()
+
+    def await_started(self):
+        """
+        Awaits start finished.
+        """
+        self.logger.info("Waiting for IgniteAware(s) to start ...")
+
+        self.await_event("Topology snapshot", self.startup_timeout_sec, 
from_the_beginning=True)
 
     def start_node(self, node):
         self.init_persistent(node)
 
         super().start_node(node)
 
-        wait_until(lambda: len(self.pids(node)) > 0, timeout_sec=10)
+        wait_until(lambda: self.alive(node), timeout_sec=10)
 
         ignite_jmx_mixin(node, self.pids(node))
 
+    def stop_async(self):
+        """
+        Stop in async way.
+        """
+        super().stop()
+
+    def stop(self):
+        if not self.killed:
+            self.stop_async()
+            self.await_stopped()
+        else:
+            self.logger.debug("Skipping node stop since it already killed.")
+
+    def await_stopped(self):
+        """
+        Awaits stop finished.
+        """
+        self.logger.info("Waiting for IgniteAware(s) to stop ...")
+
+        for node in self.nodes:
+            stopped = self.wait_node(node, 
timeout_sec=self.shutdown_timeout_sec)
+            assert stopped, "Node %s's worker thread did not stop in %d 
seconds" % \
+                            (str(node.account), self.shutdown_timeout_sec)
+
+        for node in self.nodes:
+            wait_until(lambda: not self.alive(node), 
timeout_sec=self.shutdown_timeout_sec,
+                       err_msg="Node %s's remote processes failed to stop in 
%d seconds" %
+                               (str(node.account), self.shutdown_timeout_sec))
+
+    def stop_node(self, node):
+        pids = self.pids(node)
+
+        for pid in pids:
+            node.account.signal(pid, signal.SIGTERM, allow_fail=False)
+
+    def kill(self):
+        """
+        Kills nodes.
+        """
+        self.logger.info("Killing IgniteAware(s) ...")
+
+        for node in self.nodes:
+            pids = self.pids(node)
+
+            for pid in pids:
+                node.account.signal(pid, signal.SIGKILL, allow_fail=False)
+
+        for node in self.nodes:
+            wait_until(lambda: not self.alive(node), 
timeout_sec=self.shutdown_timeout_sec,
+                       err_msg="Node %s's remote processes failed to be killed 
in %d seconds" %
+                               (str(node.account), self.shutdown_timeout_sec))
+
+        self.killed = True
+
     def clean(self):
         self.__restore_iptables()
 
@@ -134,10 +211,7 @@ class IgniteAwareService(BackgroundThreadService, 
IgnitePersistenceAware, metacl
         :param backoff_sec: Number of seconds to back off between each failure 
to meet the condition
                 before checking again.
         """
-        with node.account.monitor_log(self.STDOUT_STDERR_CAPTURE) as monitor:
-            if from_the_beginning:
-                monitor.offset = 0
-
+        with monitor_log(node, self.STDOUT_STDERR_CAPTURE, from_the_beginning) 
as monitor:
             monitor.wait_until(evt_message, timeout_sec=timeout_sec, 
backoff_sec=backoff_sec,
                                err_msg="Event [%s] was not triggered on '%s' 
in %d seconds" % (evt_message, node.name,
                                                                                
                timeout_sec))
diff --git a/modules/ducktests/tests/ignitetest/services/utils/log_utils.py 
b/modules/ducktests/tests/ignitetest/services/utils/log_utils.py
new file mode 100644
index 0000000..58cb5e4
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/log_utils.py
@@ -0,0 +1,42 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""
+This module contains log utils.
+"""
+
+from contextlib import contextmanager
+
+from ducktape.cluster.remoteaccount import LogMonitor
+
+
+# pylint: disable=W0703
+@contextmanager
+def monitor_log(node, log, from_the_beginning=False):
+    """
+    Context manager that returns an object that helps you wait for events to
+    occur in a log. This checks the size of the log at the beginning of the
+    block and makes a helper object available with convenience methods for
+    checking or waiting for a pattern to appear in the log. This will commonly
+    be used to start a process, then wait for a log message indicating the
+    process is in a ready state.
+
+    See ``LogMonitor`` for more usage information.
+    """
+    try:
+        offset = 0 if from_the_beginning else int(node.account.ssh_output("wc 
-c %s" % log).split()[0])
+    except Exception:
+        offset = 0
+    yield LogMonitor(node.account, log, offset)
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py 
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index ca8a7fb..d6dffa4 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -22,6 +22,8 @@ import os.path
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
+from ignitetest.services.utils.log_utils import monitor_log
+
 
 class ZookeeperSettings:
     """
@@ -102,8 +104,7 @@ class ZookeeperService(Service):
         :param node:  Zookeeper service node.
         :param timeout: Wait timeout.
         """
-        with node.account.monitor_log(self.LOG_FILE) as monitor:
-            monitor.offset = 0
+        with monitor_log(node, self.LOG_FILE, from_the_beginning=True) as 
monitor:
             monitor.wait_until(
                 "LEADER ELECTION TOOK",
                 timeout_sec=timeout,
@@ -150,3 +151,9 @@ class ZookeeperService(Service):
                              (self.__class__.__name__, node.account))
         node.account.kill_process("zookeeper", clean_shutdown=False, 
allow_fail=True)
         node.account.ssh("rm -rf %s %s %s" % (self.CONFIG_ROOT, self.DATA_DIR, 
self.LOG_FILE), allow_fail=False)
+
+    def kill(self):
+        """
+        Kills the service.
+        """
+        self.stop()
diff --git 
a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py 
b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
index 1a1ee41..5511805 100644
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
@@ -55,7 +55,7 @@ class AddNodeRebalanceTest(IgniteTest):
         IgniteApplicationService(self.test_context, config=app_config,
                                  
java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
                                  params={"cacheName": "test-cache", "range": 
self.DATA_AMOUNT},
-                                 timeout_sec=self.PRELOAD_TIMEOUT).run()
+                                 
startup_timeout_sec=self.PRELOAD_TIMEOUT).run()
 
         ignite = IgniteService(self.test_context, 
node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
                                num_nodes=1)
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py 
b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index 381c5d2..cb56546 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -16,6 +16,7 @@
 """
 This module contains Cellular Affinity tests.
 """
+import math
 from enum import IntEnum
 
 from ducktape.mark import matrix
@@ -25,11 +26,13 @@ from ignitetest.services.ignite import IgniteService
 from ignitetest.services.ignite_app import IgniteApplicationService
 from ignitetest.services.utils.control_utility import ControlUtility
 from ignitetest.services.utils.ignite_configuration import 
IgniteConfiguration, IgniteClientConfiguration
-from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
+from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster, from_zookeeper_cluster, \
+    TcpDiscoverySpi
+from ignitetest.services.zk.zookeeper import ZookeeperSettings, 
ZookeeperService
 from ignitetest.utils import ignite_versions, version_if, cluster
+from ignitetest.utils.enum import constructible
 from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST_2_8
-from ignitetest.utils.enum import constructible
 
 
 @constructible
@@ -39,7 +42,16 @@ class StopType(IntEnum):
     """
     SIGTERM = 0
     SIGKILL = 1
-    DISCONNECT = 2
+    DROP_NETWORK = 2
+
+
+@constructible
+class DiscoreryType(IntEnum):
+    """
+    Discovery type.
+    """
+    ZooKeeper = 0
+    TCP = 1
 
 
 # pylint: disable=W0223
@@ -47,7 +59,8 @@ class CellularAffinity(IgniteTest):
     """
     Tests Cellular Affinity scenarios.
     """
-    NUM_NODES = 3
+    NODES_PER_CELL = 3
+    ZOOKEPER_CLUSTER_SIZE = 3
 
     ATTRIBUTE = "CELL"
 
@@ -69,7 +82,7 @@ class CellularAffinity(IgniteTest):
                             </bean>
                         </property>
                         <property name="name" value="{{ cacheName }}"/>
-                        <property name="backups" value="{{ nodes }}"/>
+                        <property name="backups" value="{{ backups }}"/>
                         <property name="atomicityMode" value="TRANSACTIONAL"/>
                     </bean>
                 </list>
@@ -82,11 +95,12 @@ class CellularAffinity(IgniteTest):
         :return: Configuration properties.
         """
         return Template(CellularAffinity.CONFIG_TEMPLATE) \
-            .render(nodes=CellularAffinity.NUM_NODES,  # bigger than cell 
capacity (to handle single cell useless test)
-                    attr=CellularAffinity.ATTRIBUTE,
-                    cacheName=CellularAffinity.CACHE_NAME)
+            .render(
+            backups=CellularAffinity.NODES_PER_CELL,  # bigger than cell 
capacity (to handle single cell useless test)
+            attr=CellularAffinity.ATTRIBUTE,
+            cacheName=CellularAffinity.CACHE_NAME)
 
-    @cluster(num_nodes=NUM_NODES * 3 + 1)
+    @cluster(num_nodes=NODES_PER_CELL * 3 + 1)
     @version_if(lambda version: version >= DEV_BRANCH)
     @ignite_versions(str(DEV_BRANCH))
     def test_distribution(self, ignite_version):
@@ -94,9 +108,15 @@ class CellularAffinity(IgniteTest):
         Tests Cellular Affinity scenario (partition distribution).
         """
         cell1 = self.start_cell(ignite_version, ['-D' + 
CellularAffinity.ATTRIBUTE + '=1'])
-        self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=2'], joined_cluster=cell1)
-        self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=XXX', '-DRANDOM=42'],
-                        joined_cluster=cell1)
+
+        discovery_spi = from_ignite_cluster(cell1)
+
+        cell2 = self.start_cell(ignite_version, ['-D' + 
CellularAffinity.ATTRIBUTE + '=2'], discovery_spi)
+        cell3 = self.start_cell(ignite_version, ['-D' + 
CellularAffinity.ATTRIBUTE + '=XXX', '-DRANDOM=42'],
+                                discovery_spi)
+
+        for cell in [cell1, cell2, cell3]:
+            cell.await_started()
 
         ControlUtility(cell1, self.test_context).activate()
 
@@ -106,42 +126,82 @@ class CellularAffinity(IgniteTest):
             
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.DistributionChecker",
             params={"cacheName": CellularAffinity.CACHE_NAME,
                     "attr": CellularAffinity.ATTRIBUTE,
-                    "nodesPerCell": self.NUM_NODES})
+                    "nodesPerCell": self.NODES_PER_CELL})
 
         checker.run()
 
+    # pylint: disable=R0912
     # pylint: disable=R0914
-    @cluster(num_nodes=NUM_NODES * (3 + 1))
+    # pylint: disable=no-member
+    @cluster(num_nodes=2 * (NODES_PER_CELL + 1) + 3)  # cell_cnt * 
(srv_per_cell + cell_streamer) + zookeper_cluster
     @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
-    @matrix(stop_type=[
-        StopType.DISCONNECT,
-        StopType.SIGKILL,
-        StopType.SIGTERM])
-    def test_latency(self, ignite_version, stop_type):
+    @matrix(stop_type=[StopType.DROP_NETWORK, StopType.SIGKILL, 
StopType.SIGTERM],
+            discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP])
+    def test_latency(self, ignite_version, stop_type, discovery_type):
         """
         Tests Cellular switch tx latency.
         """
+        cluster_size = len(self.test_context.cluster)
+
+        cells_amount = math.floor((cluster_size - self.ZOOKEPER_CLUSTER_SIZE) 
/ (self.NODES_PER_CELL + 1))
+
+        assert cells_amount >= 2
+
+        self.test_context.logger.info(
+            "Cells amount calculated as %d at cluster with %d nodes in total" 
% (cells_amount, cluster_size))
+
         data = {}
 
-        cell1, prepared_tx_loader1 = 
self.start_cell_with_prepared_txs(ignite_version, "C1")
-        _, prepared_tx_loader2 = 
self.start_cell_with_prepared_txs(ignite_version, "C2", joined_cluster=cell1)
-        _, prepared_tx_loader3 = 
self.start_cell_with_prepared_txs(ignite_version, "C3", joined_cluster=cell1)
+        discovery_spi = None
+
+        modules = []
+
+        d_type = DiscoreryType.construct_from(discovery_type)
 
-        loaders = [prepared_tx_loader1, prepared_tx_loader2, 
prepared_tx_loader3]
+        if d_type is DiscoreryType.ZooKeeper:
+            zk_settings = ZookeeperSettings()
+            zk_quorum = ZookeeperService(self.test_context, 
self.ZOOKEPER_CLUSTER_SIZE, settings=zk_settings)
+            zk_quorum.start()
 
-        failed_loader = prepared_tx_loader3
+            modules.append('zookeeper')
 
-        tx_streamer1 = self.start_tx_streamer(ignite_version, "C1", 
joined_cluster=cell1)
-        tx_streamer2 = self.start_tx_streamer(ignite_version, "C2", 
joined_cluster=cell1)
-        tx_streamer3 = self.start_tx_streamer(ignite_version, "C3", 
joined_cluster=cell1)
+            discovery_spi = from_zookeeper_cluster(zk_quorum)
 
-        streamers = [tx_streamer1, tx_streamer2, tx_streamer3]
+        cell0, prepared_tx_loader1 = 
self.start_cell_with_prepared_txs(ignite_version, "C0", discovery_spi, modules)
+
+        if d_type is DiscoreryType.TCP:
+            discovery_spi = from_ignite_cluster(cell0)
+
+        assert discovery_spi is not None
+
+        loaders = [prepared_tx_loader1]
+        nodes = [cell0]
+
+        for cell in range(1, cells_amount):
+            node, prepared_tx_loader = \
+                self.start_cell_with_prepared_txs(ignite_version, "C%d" % 
cell, discovery_spi, modules)
+
+            loaders.append(prepared_tx_loader)
+            nodes.append(node)
+
+        failed_loader = loaders[1]
+
+        for node in [*nodes, *loaders]:
+            node.await_started()
+
+        streamers = []
+
+        for cell in range(0, cells_amount):
+            streamers.append(self.start_tx_streamer(ignite_version, "C%d" % 
cell, discovery_spi, modules))
 
         for streamer in streamers:  # starts tx streaming with latency record 
(with some warmup).
-            streamer.start()
+            streamer.start_async()
 
-        ControlUtility(cell1, 
self.test_context).disable_baseline_auto_adjust()  # baseline set.
-        ControlUtility(cell1, self.test_context).activate()
+        for streamer in streamers:
+            streamer.await_started()
+
+        ControlUtility(cell0, 
self.test_context).disable_baseline_auto_adjust()  # baseline set.
+        ControlUtility(cell0, self.test_context).activate()
 
         for loader in loaders:
             loader.await_event("ALL_TRANSACTIONS_PREPARED", 180, 
from_the_beginning=True)
@@ -150,13 +210,12 @@ class CellularAffinity(IgniteTest):
             streamer.await_event("WARMUP_FINISHED", 180, 
from_the_beginning=True)
 
         # node left with prepared txs.
-        # pylint: disable=no-member
         with StopType.construct_from(stop_type) as s_type:
             if s_type is StopType.SIGTERM:
                 failed_loader.stop_async()
             elif s_type is StopType.SIGKILL:
-                failed_loader.stop(clean_shutdown=False)
-            elif s_type is StopType.DISCONNECT:
+                failed_loader.kill()
+            elif s_type is StopType.DROP_NETWORK:
                 failed_loader.drop_network()
 
         for streamer in streamers:
@@ -176,64 +235,63 @@ class CellularAffinity(IgniteTest):
 
             cell = streamer.params["cell"]
 
-            data["[%s cell %s]" % ("alive" if cell is not 
failed_loader.params["cell"] else "broken", cell)] = \
+            data["[%s cell %s]" % ("alive" if cell != 
failed_loader.params["cell"] else "broken", cell)] = \
                 "worst_latency=%s, tx_streamed=%s, measure_duration=%s" % (
                     streamer.extract_result("WORST_LATENCY"), 
streamer.extract_result("STREAMED"),
                     streamer.extract_result("MEASURE_DURATION"))
 
         return data
 
-    def start_tx_streamer(self, version, cell, joined_cluster):
+    def start_tx_streamer(self, version, cell, discovery_spi, modules):
         """
         Starts transaction streamer.
         """
         return IgniteApplicationService(
             self.test_context,
             IgniteClientConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
-                                      
discovery_spi=from_ignite_cluster(joined_cluster)),
+                                      discovery_spi=discovery_spi),
             
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularTxStreamer",
             params={"cacheName": CellularAffinity.CACHE_NAME,
                     "attr": CellularAffinity.ATTRIBUTE,
                     "cell": cell,
                     "warmup": 10000},
-            timeout_sec=180)
+            modules=modules, startup_timeout_sec=180)
 
-    def start_cell_with_prepared_txs(self, version, cell_id, 
joined_cluster=None):
+    def start_cell_with_prepared_txs(self, version, cell_id, discovery_spi, 
modules):
         """
         Starts cell with prepared transactions.
         """
-        nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=' + cell_id],
-                                CellularAffinity.NUM_NODES - 1, joined_cluster)
+        nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=' + cell_id], discovery_spi, modules,
+                                CellularAffinity.NODES_PER_CELL - 1)
 
         prepared_tx_streamer = IgniteApplicationService(  # last server node 
at the cell.
             self.test_context,
             IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
-                                discovery_spi=from_ignite_cluster(nodes)),  # 
Server node.
+                                discovery_spi=from_ignite_cluster(nodes) if 
discovery_spi is None else discovery_spi),
             
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test."
                             "CellularPreparedTxStreamer",
             params={"cacheName": CellularAffinity.CACHE_NAME,
                     "attr": CellularAffinity.ATTRIBUTE,
                     "cell": cell_id,
                     "txCnt": CellularAffinity.PREPARED_TX_CNT},
-            jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id],
-            timeout_sec=180)
+            jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id], 
modules=modules, startup_timeout_sec=180)
 
-        prepared_tx_streamer.start()  # starts last server node and creates 
prepared txs on it.
+        prepared_tx_streamer.start_async()  # starts last server node and 
creates prepared txs on it.
 
         return nodes, prepared_tx_streamer
 
-    def start_cell(self, version, jvm_opts, nodes_cnt=NUM_NODES, 
joined_cluster=None):
+    # pylint: disable=R0913
+    def start_cell(self, version, jvm_opts, discovery_spi=None, modules=None, 
nodes_cnt=NODES_PER_CELL):
         """
         Starts cell.
         """
-        config = IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
-                                     cluster_state="INACTIVE")
-
-        if joined_cluster:
-            config = 
config._replace(discovery_spi=from_ignite_cluster(joined_cluster))
-
-        ignites = IgniteService(self.test_context, config, 
num_nodes=nodes_cnt, jvm_opts=jvm_opts)
+        ignites = IgniteService(
+            self.test_context,
+            IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
+                                cluster_state="INACTIVE",
+                                discovery_spi=TcpDiscoverySpi() if 
discovery_spi is None else discovery_spi),
+            num_nodes=nodes_cnt, modules=modules, jvm_opts=jvm_opts, 
startup_timeout_sec=180)
 
-        ignites.start()
+        ignites.start_async()
 
         return ignites
diff --git a/modules/ducktests/tests/ignitetest/tests/client_test.py 
b/modules/ducktests/tests/ignitetest/tests/client_test.py
index c6ee4ed..2da41d5 100644
--- a/modules/ducktests/tests/ignitetest/tests/client_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/client_test.py
@@ -74,7 +74,7 @@ class ClientTest(IgniteTest):
 
     # pylint: disable=R0914
     # pylint: disable=R0913
-    def ignite_start_stop(self, ignite_version, correct_stop_temp_node, 
nodes_num, static_clients_num, temp_client,
+    def ignite_start_stop(self, ignite_version, graceful_shutdown, nodes_num, 
static_clients_num, temp_client,
                           iteration_count, client_work_time):
         """
         Test for starting and stopping fat clients.
@@ -128,7 +128,10 @@ class ClientTest(IgniteTest):
 
             time.sleep(client_work_time)
 
-            temp_clients.stop(correct_stop_temp_node)
+            if graceful_shutdown:
+                temp_clients.stop()
+            else:
+                temp_clients.kill()
 
             current_top_v += temp_client
 
diff --git 
a/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py 
b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
index 710e317..6a38f7c 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
@@ -199,8 +199,8 @@ class BaselineTests(IgniteTest):
         if join_cluster:
             config._replace(discovery_spi=from_ignite_cluster(join_cluster))
 
-        servers = IgniteService(self.test_context, config=config, 
num_nodes=num_nodes)
+        servers = IgniteService(self.test_context, config=config, 
num_nodes=num_nodes, startup_timeout_sec=timeout_sec)
 
-        servers.start(timeout_sec=timeout_sec)
+        servers.start()
 
         return servers
diff --git 
a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py 
b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
index fef01b9..0ea9ce9 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
@@ -166,9 +166,9 @@ class TransactionsTests(IgniteTest):
             caches=[CacheConfiguration(name=self.CACHE_NAME, 
atomicity_mode='TRANSACTIONAL')]
         )
 
-        servers = IgniteService(self.test_context, config=config, 
num_nodes=num_nodes)
+        servers = IgniteService(self.test_context, config=config, 
num_nodes=num_nodes, startup_timeout_sec=timeout_sec)
 
-        servers.start(timeout_sec=timeout_sec)
+        servers.start()
 
         return servers
 
diff --git a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py 
b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
index c182c61..08a43bf 100644
--- a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
@@ -18,6 +18,7 @@ This module contains PME free switch tests.
 """
 
 import time
+from enum import IntEnum
 
 from ducktape.mark import matrix
 
@@ -28,22 +29,34 @@ from ignitetest.services.utils.ignite_configuration import 
IgniteConfiguration
 from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.utils import ignite_versions, cluster
+from ignitetest.utils.enum import constructible
 from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, LATEST_2_7, V_2_8_0, 
IgniteVersion
 
 
+@constructible
+class LoadType(IntEnum):
+    """
+    Load type.
+    """
+    NONE = 0
+    EXTRA_CACHES = 1
+    LONG_TXS = 2
+
+
 # pylint: disable=W0223
+# pylint: disable=no-member
 class PmeFreeSwitchTest(IgniteTest):
     """
     Tests PME free switch scenarios.
     """
     NUM_NODES = 9
-    CACHES_AMOUNT = 100
+    EXTRA_CACHES_AMOUNT = 100
 
     @cluster(num_nodes=NUM_NODES + 2)
     @ignite_versions(str(DEV_BRANCH), str(LATEST_2_7))
-    @matrix(long_txs=[False, True])
-    def test(self, ignite_version, long_txs):
+    @matrix(load_type=[LoadType.NONE, LoadType.EXTRA_CACHES, 
LoadType.LONG_TXS])
+    def test(self, ignite_version, load_type):
         """
         Tests PME-free switch scenario (node stop).
         """
@@ -51,16 +64,21 @@ class PmeFreeSwitchTest(IgniteTest):
 
         caches = [CacheConfiguration(name='test-cache', backups=2, 
atomicity_mode='TRANSACTIONAL')]
 
+        l_type = LoadType.construct_from(load_type)
+
         # Checking PME (before 2.8) vs PME-free (2.8+) switch duration, but
         # focusing on switch duration (which depends on caches amount) when 
long_txs is false and
         # on waiting for previously started txs before the switch (which 
depends on txs duration) when long_txs of true.
-        if not long_txs:
-            for idx in range(1, self.CACHES_AMOUNT):
+        if l_type is LoadType.EXTRA_CACHES:
+            for idx in range(1, self.EXTRA_CACHES_AMOUNT):
                 caches.append(CacheConfiguration(name="cache-%d" % idx, 
backups=2, atomicity_mode='TRANSACTIONAL'))
 
         config = IgniteConfiguration(version=IgniteVersion(ignite_version), 
caches=caches, cluster_state="INACTIVE")
 
         num_nodes = len(self.test_context.cluster) - 2
+
+        self.test_context.logger.info("Nodes amount calculated as %d." % 
num_nodes)
+
         ignites = IgniteService(self.test_context, config, num_nodes=num_nodes)
 
         ignites.start()
@@ -78,9 +96,9 @@ class PmeFreeSwitchTest(IgniteTest):
             client_config,
             
java_class_name="org.apache.ignite.internal.ducktest.tests.pme_free_switch_test.LongTxStreamerApplication",
             params={"cacheName": "test-cache"},
-            timeout_sec=180)
+            startup_timeout_sec=180)
 
-        if long_txs:
+        if l_type is LoadType.LONG_TXS:
             long_tx_streamer.start()
 
         single_key_tx_streamer = IgniteApplicationService(
@@ -89,18 +107,20 @@ class PmeFreeSwitchTest(IgniteTest):
             
java_class_name="org.apache.ignite.internal.ducktest.tests.pme_free_switch_test."
                             "SingleKeyTxStreamerApplication",
             params={"cacheName": "test-cache", "warmup": 1000},
-            timeout_sec=180)
+            startup_timeout_sec=180)
 
         single_key_tx_streamer.start()
 
         ignites.stop_node(ignites.nodes[num_nodes - 1])
 
-        if long_txs:
-            long_tx_streamer.await_event("Node left topology", 60, 
from_the_beginning=True)
+        single_key_tx_streamer.await_event("Node left topology", 60, 
from_the_beginning=True)
 
+        if l_type is LoadType.LONG_TXS:
             time.sleep(30)  # keeping txs alive for 30 seconds.
 
-            long_tx_streamer.stop()
+            long_tx_streamer.stop_async()
+
+            single_key_tx_streamer.await_event("Node left topology", 60, 
from_the_beginning=True)
 
         single_key_tx_streamer.await_event("APPLICATION_STREAMED", 60)  # 
waiting for streaming continuation.
 
diff --git a/modules/ducktests/tests/ignitetest/tests/self_test.py 
b/modules/ducktests/tests/ignitetest/tests/self_test.py
index fbeda05..e5f91e9 100644
--- a/modules/ducktests/tests/ignitetest/tests/self_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/self_test.py
@@ -75,14 +75,14 @@ class SelfTest(IgniteTest):
             IgniteClientConfiguration(version=IgniteVersion(ignite_version),
                                       
discovery_spi=from_ignite_cluster(ignites)),
             
java_class_name="org.apache.ignite.internal.ducktest.tests.self_test.TestKillableApplication",
-            timeout_sec=180)
+            startup_timeout_sec=180)
 
         node2 = IgniteApplicationService(
             self.test_context,
             IgniteClientConfiguration(version=IgniteVersion(ignite_version),
                                       
discovery_spi=from_ignite_cluster(ignites)),
             
java_class_name="org.apache.ignite.internal.ducktest.tests.self_test.TestSelfKillableApplication",
-            timeout_sec=180)
+            startup_timeout_sec=180)
 
         node1.start()
 
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py 
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index feb5993..677141c 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -18,6 +18,7 @@ This module contains basic ignite test.
 """
 from time import monotonic
 
+from ducktape.cluster.remoteaccount import RemoteCommandError
 from ducktape.tests.test import Test
 
 
@@ -40,3 +41,17 @@ class IgniteTest(Test):
             so that only the difference between the results of consecutive 
calls is valid.
         """
         return monotonic()
+
+    # pylint: disable=W0212
+    def tearDown(self):
+        self.logger.debug("Killing all services to speed-up the tearing down.")
+
+        for service in self.test_context.services._services.values():
+            try:
+                service.kill()
+            except RemoteCommandError:
+                pass  # Process may be already self-killed on segmentation.
+
+        self.logger.debug("All services killed.")
+
+        super().tearDown()

Reply via email to