This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 41606e9 IGNITE-13703 Check difference between TcpDiscovery and
ZookeeperDiscovery at Cellular switch (#8454)
41606e9 is described below
commit 41606e95859fdaa098931e0e2ce874ffb3f17a60
Author: Anton Vinogradov <[email protected]>
AuthorDate: Wed Nov 25 15:31:50 2020 +0300
IGNITE-13703 Check difference between TcpDiscovery and ZookeeperDiscovery
at Cellular switch (#8454)
---
.../SingleKeyTxStreamerApplication.java | 2 +-
.../ducktests/tests/ignitetest/services/ignite.py | 42 +-----
.../tests/ignitetest/services/ignite_app.py | 60 ++------
.../ducktests/tests/ignitetest/services/spark.py | 10 +-
.../ignitetest/services/utils/ignite_aware.py | 86 ++++++++++-
.../tests/ignitetest/services/utils/log_utils.py | 42 ++++++
.../tests/ignitetest/services/zk/zookeeper.py | 11 +-
.../ignitetest/tests/add_node_rebalance_test.py | 2 +-
.../ignitetest/tests/cellular_affinity_test.py | 166 ++++++++++++++-------
.../tests/ignitetest/tests/client_test.py | 7 +-
.../tests/control_utility/baseline_test.py | 4 +-
.../ignitetest/tests/control_utility/tx_test.py | 4 +-
.../tests/ignitetest/tests/pme_free_switch_test.py | 42 ++++--
.../ducktests/tests/ignitetest/tests/self_test.py | 4 +-
.../tests/ignitetest/utils/ignite_test.py | 15 ++
15 files changed, 327 insertions(+), 170 deletions(-)
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
index bb91df8..c22fd37 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
@@ -61,7 +61,7 @@ public class SingleKeyTxStreamerApplication extends
IgniteAwareApplication {
maxLatency = latency;
}
- if (cnt % 1000 == 0)
+ if (cnt % 100 == 0)
log.info("APPLICATION_STREAMED " + cnt + " transactions [max="
+ maxLatency + "]");
}
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py
b/modules/ducktests/tests/ignitetest/services/ignite.py
index a0158ef..e5ec3fa 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -23,7 +23,6 @@ import signal
from datetime import datetime
from ducktape.cluster.remoteaccount import RemoteCommandError
-from ducktape.utils.util import wait_until
from ignitetest.services.utils.ignite_aware import IgniteAwareService
@@ -36,43 +35,10 @@ class IgniteService(IgniteAwareService):
HEAP_DUMP_FILE = os.path.join(IgniteAwareService.PERSISTENT_ROOT,
"ignite-heap.bin")
# pylint: disable=R0913
- def __init__(self, context, config, num_nodes, jvm_opts=None,
modules=None):
- super().__init__(context, config, num_nodes, modules=modules,
jvm_opts=jvm_opts)
-
- # pylint: disable=W0221
- def start(self, timeout_sec=180):
- super().start()
-
- self.logger.info("Waiting for Ignite(s) to start...")
-
- for node in self.nodes:
- self.await_node_started(node, timeout_sec)
-
- def await_node_started(self, node, timeout_sec):
- """
- Await topology ready event on node start.
- :param node: Node to wait
- :param timeout_sec: Number of seconds to wait event.
- """
- self.await_event_on_node("Topology snapshot", node, timeout_sec,
from_the_beginning=True)
-
- if len(self.pids(node)) == 0:
- raise Exception("No process ids recorded on node %s" %
node.account.hostname)
-
- # pylint: disable=W0221
- def stop_node(self, node, clean_shutdown=True, timeout_sec=60):
- pids = self.pids(node)
- sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
- for pid in pids:
- node.account.signal(pid, sig, allow_fail=False)
-
- try:
- wait_until(lambda: len(self.pids(node)) == 0,
timeout_sec=timeout_sec,
- err_msg="Ignite node failed to stop in %d seconds" %
timeout_sec)
- except Exception:
- self.thread_dump(node)
- raise
+ def __init__(self, context, config, num_nodes, jvm_opts=None,
startup_timeout_sec=60, shutdown_timeout_sec=10,
+ modules=None):
+ super().__init__(context, config, num_nodes, startup_timeout_sec,
shutdown_timeout_sec, modules=modules,
+ jvm_opts=jvm_opts)
def clean_node(self, node):
node.account.kill_java_processes(self.APP_SERVICE_CLASS,
clean_shutdown=False, allow_fail=True)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index fa5cc5d..bb93725 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -34,62 +34,26 @@ class IgniteApplicationService(IgniteAwareService):
SERVICE_JAVA_CLASS_NAME =
"org.apache.ignite.internal.ducktest.utils.IgniteAwareApplicationService"
# pylint: disable=R0913
- def __init__(self, context, config, java_class_name, num_nodes=1,
params="", timeout_sec=60, modules=None,
- servicejava_class_name=SERVICE_JAVA_CLASS_NAME,
jvm_opts=None, start_ignite=True):
- super().__init__(context, config, num_nodes, modules=modules,
servicejava_class_name=servicejava_class_name,
- java_class_name=java_class_name, params=params,
jvm_opts=jvm_opts, start_ignite=start_ignite)
+ def __init__(self, context, config, java_class_name, num_nodes=1,
params="", startup_timeout_sec=60,
+ shutdown_timeout_sec=10, modules=None,
servicejava_class_name=SERVICE_JAVA_CLASS_NAME, jvm_opts=None,
+ start_ignite=True):
+ super().__init__(context, config, num_nodes, startup_timeout_sec,
shutdown_timeout_sec, modules=modules,
+ servicejava_class_name=servicejava_class_name,
java_class_name=java_class_name, params=params,
+ jvm_opts=jvm_opts, start_ignite=start_ignite)
self.servicejava_class_name = servicejava_class_name
self.java_class_name = java_class_name
- self.timeout_sec = timeout_sec
self.params = params
- def start(self):
- super().start()
+ def await_started(self):
+ super().await_started()
- self.logger.info("Waiting for Ignite aware Application (%s) to
start..." % self.java_class_name)
+ self.__check_status("IGNITE_APPLICATION_INITIALIZED",
timeout=self.startup_timeout_sec)
- self.await_event("Topology snapshot", self.timeout_sec,
from_the_beginning=True)
+ def await_stopped(self):
+ super().await_stopped()
- self.__check_status("IGNITE_APPLICATION_INITIALIZED",
timeout=self.timeout_sec)
-
- def stop_async(self, clean_shutdown=True):
- """
- Stop in async way.
- """
- for node in self.nodes:
- self.stop_node(node=node, clean_shutdown=clean_shutdown)
-
- # pylint: disable=W0221
- def stop_node(self, node, clean_shutdown=True):
- """
- Stops node in async way.
- """
- self.logger.info("%s Stopping node %s" % (self.__class__.__name__,
str(node.account)))
- node.account.kill_java_processes(self.servicejava_class_name,
clean_shutdown=clean_shutdown,
- allow_fail=True)
-
- def await_stopped(self, timeout_sec=10):
- """
- Awaits node stop finish.
- """
- for node in self.nodes:
- stopped = self.wait_node(node, timeout_sec=timeout_sec)
- assert stopped, "Node %s: did not stop within the specified
timeout of %s seconds" % \
- (str(node.account), str(timeout_sec))
-
- self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec)
-
- # pylint: disable=W0221
- def stop(self, clean_shutdown=True, timeout_sec=10):
- """
- Stop services.
- """
- if clean_shutdown:
- self.stop_async(clean_shutdown)
- self.await_stopped(timeout_sec)
- else:
- self.stop_async(clean_shutdown)
+ self.__check_status("IGNITE_APPLICATION_FINISHED")
def __check_status(self, desired, timeout=1):
self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout,
from_the_beginning=True)
diff --git a/modules/ducktests/tests/ignitetest/services/spark.py
b/modules/ducktests/tests/ignitetest/services/spark.py
index 16d90e1..77756eb 100644
--- a/modules/ducktests/tests/ignitetest/services/spark.py
+++ b/modules/ducktests/tests/ignitetest/services/spark.py
@@ -21,7 +21,9 @@ import os.path
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.services.background_thread import BackgroundThreadService
+
from ignitetest.services.utils.ignite_persistence import PersistenceAware
+from ignitetest.services.utils.log_utils import monitor_log
class SparkService(BackgroundThreadService, PersistenceAware):
@@ -91,7 +93,7 @@ class SparkService(BackgroundThreadService, PersistenceAware):
self.logger.debug("Monitoring - %s" % log_file)
timeout_sec = 30
- with node.account.monitor_log(log_file) as monitor:
+ with monitor_log(node, log_file) as monitor:
node.account.ssh(cmd)
monitor.wait_until(log_msg, timeout_sec=timeout_sec, backoff_sec=5,
err_msg="Spark doesn't start at %d seconds" %
timeout_sec)
@@ -156,3 +158,9 @@ class SparkService(BackgroundThreadService,
PersistenceAware):
userID=node.account.user,
instance=1,
host=node.account.hostname)
+
+ def kill(self):
+ """
+ Kills the service.
+ """
+ self.stop()
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 5045e98..513e0e5 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -17,6 +17,7 @@
This module contains the base class to build services aware of Ignite.
"""
import os
+import signal
import socket
import sys
import time
@@ -31,6 +32,7 @@ from ignitetest.services.utils.concurrent import
CountDownLatch, AtomicValue
from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware
from ignitetest.services.utils.ignite_spec import resolve_spec
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin
+from ignitetest.services.utils.log_utils import monitor_log
class IgniteAwareService(BackgroundThreadService, IgnitePersistenceAware,
metaclass=ABCMeta):
@@ -41,7 +43,7 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePersistenceAware, metacl
NETFILTER_STORE_PATH = os.path.join(IgnitePersistenceAware.TEMP_DIR,
"iptables.bak")
# pylint: disable=R0913
- def __init__(self, context, config, num_nodes, **kwargs):
+ def __init__(self, context, config, num_nodes, startup_timeout_sec,
shutdown_timeout_sec, **kwargs):
"""
**kwargs are params that passed to IgniteSpec
"""
@@ -52,20 +54,95 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePersistenceAware, metacl
self.log_level = "DEBUG"
self.config = config
+ self.startup_timeout_sec = startup_timeout_sec
+ self.shutdown_timeout_sec = shutdown_timeout_sec
self.spec = resolve_spec(self, context, config, **kwargs)
self.disconnected_nodes = []
+ self.killed = False
+
+ def start_async(self):
+ """
+ Starts in async way.
+ """
+ super().start()
+
+ def start(self):
+ self.start_async()
+ self.await_started()
+
+ def await_started(self):
+ """
+ Awaits start finished.
+ """
+ self.logger.info("Waiting for IgniteAware(s) to start ...")
+
+ self.await_event("Topology snapshot", self.startup_timeout_sec,
from_the_beginning=True)
def start_node(self, node):
self.init_persistent(node)
super().start_node(node)
- wait_until(lambda: len(self.pids(node)) > 0, timeout_sec=10)
+ wait_until(lambda: self.alive(node), timeout_sec=10)
ignite_jmx_mixin(node, self.pids(node))
+ def stop_async(self):
+ """
+ Stop in async way.
+ """
+ super().stop()
+
+ def stop(self):
+ if not self.killed:
+ self.stop_async()
+ self.await_stopped()
+ else:
+ self.logger.debug("Skipping node stop since it already killed.")
+
+ def await_stopped(self):
+ """
+ Awaits stop finished.
+ """
+ self.logger.info("Waiting for IgniteAware(s) to stop ...")
+
+ for node in self.nodes:
+ stopped = self.wait_node(node,
timeout_sec=self.shutdown_timeout_sec)
+ assert stopped, "Node %s's worker thread did not stop in %d
seconds" % \
+ (str(node.account), self.shutdown_timeout_sec)
+
+ for node in self.nodes:
+ wait_until(lambda: not self.alive(node),
timeout_sec=self.shutdown_timeout_sec,
+ err_msg="Node %s's remote processes failed to stop in
%d seconds" %
+ (str(node.account), self.shutdown_timeout_sec))
+
+ def stop_node(self, node):
+ pids = self.pids(node)
+
+ for pid in pids:
+ node.account.signal(pid, signal.SIGTERM, allow_fail=False)
+
+ def kill(self):
+ """
+ Kills nodes.
+ """
+ self.logger.info("Killing IgniteAware(s) ...")
+
+ for node in self.nodes:
+ pids = self.pids(node)
+
+ for pid in pids:
+ node.account.signal(pid, signal.SIGKILL, allow_fail=False)
+
+ for node in self.nodes:
+ wait_until(lambda: not self.alive(node),
timeout_sec=self.shutdown_timeout_sec,
+ err_msg="Node %s's remote processes failed to be killed
in %d seconds" %
+ (str(node.account), self.shutdown_timeout_sec))
+
+ self.killed = True
+
def clean(self):
self.__restore_iptables()
@@ -134,10 +211,7 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePersistenceAware, metacl
:param backoff_sec: Number of seconds to back off between each failure
to meet the condition
before checking again.
"""
- with node.account.monitor_log(self.STDOUT_STDERR_CAPTURE) as monitor:
- if from_the_beginning:
- monitor.offset = 0
-
+ with monitor_log(node, self.STDOUT_STDERR_CAPTURE, from_the_beginning)
as monitor:
monitor.wait_until(evt_message, timeout_sec=timeout_sec,
backoff_sec=backoff_sec,
err_msg="Event [%s] was not triggered on '%s'
in %d seconds" % (evt_message, node.name,
timeout_sec))
diff --git a/modules/ducktests/tests/ignitetest/services/utils/log_utils.py
b/modules/ducktests/tests/ignitetest/services/utils/log_utils.py
new file mode 100644
index 0000000..58cb5e4
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/log_utils.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains log utils.
+"""
+
+from contextlib import contextmanager
+
+from ducktape.cluster.remoteaccount import LogMonitor
+
+
+# pylint: disable=W0703
+@contextmanager
+def monitor_log(node, log, from_the_beginning=False):
+ """
+ Context manager that returns an object that helps you wait for events to
+ occur in a log. This checks the size of the log at the beginning of the
+ block and makes a helper object available with convenience methods for
+ checking or waiting for a pattern to appear in the log. This will commonly
+ be used to start a process, then wait for a log message indicating the
+ process is in a ready state.
+
+ See ``LogMonitor`` for more usage information.
+ """
+ try:
+ offset = 0 if from_the_beginning else int(node.account.ssh_output("wc
-c %s" % log).split()[0])
+ except Exception:
+ offset = 0
+ yield LogMonitor(node.account, log, offset)
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index ca8a7fb..d6dffa4 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -22,6 +22,8 @@ import os.path
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
+from ignitetest.services.utils.log_utils import monitor_log
+
class ZookeeperSettings:
"""
@@ -102,8 +104,7 @@ class ZookeeperService(Service):
:param node: Zookeeper service node.
:param timeout: Wait timeout.
"""
- with node.account.monitor_log(self.LOG_FILE) as monitor:
- monitor.offset = 0
+ with monitor_log(node, self.LOG_FILE, from_the_beginning=True) as
monitor:
monitor.wait_until(
"LEADER ELECTION TOOK",
timeout_sec=timeout,
@@ -150,3 +151,9 @@ class ZookeeperService(Service):
(self.__class__.__name__, node.account))
node.account.kill_process("zookeeper", clean_shutdown=False,
allow_fail=True)
node.account.ssh("rm -rf %s %s %s" % (self.CONFIG_ROOT, self.DATA_DIR,
self.LOG_FILE), allow_fail=False)
+
+ def kill(self):
+ """
+ Kills the service.
+ """
+ self.stop()
diff --git
a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
index 1a1ee41..5511805 100644
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
@@ -55,7 +55,7 @@ class AddNodeRebalanceTest(IgniteTest):
IgniteApplicationService(self.test_context, config=app_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.DataGenerationApplication",
params={"cacheName": "test-cache", "range":
self.DATA_AMOUNT},
- timeout_sec=self.PRELOAD_TIMEOUT).run()
+
startup_timeout_sec=self.PRELOAD_TIMEOUT).run()
ignite = IgniteService(self.test_context,
node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
num_nodes=1)
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index 381c5d2..cb56546 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -16,6 +16,7 @@
"""
This module contains Cellular Affinity tests.
"""
+import math
from enum import IntEnum
from ducktape.mark import matrix
@@ -25,11 +26,13 @@ from ignitetest.services.ignite import IgniteService
from ignitetest.services.ignite_app import IgniteApplicationService
from ignitetest.services.utils.control_utility import ControlUtility
from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration, IgniteClientConfiguration
-from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
+from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster, from_zookeeper_cluster, \
+ TcpDiscoverySpi
+from ignitetest.services.zk.zookeeper import ZookeeperSettings,
ZookeeperService
from ignitetest.utils import ignite_versions, version_if, cluster
+from ignitetest.utils.enum import constructible
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST_2_8
-from ignitetest.utils.enum import constructible
@constructible
@@ -39,7 +42,16 @@ class StopType(IntEnum):
"""
SIGTERM = 0
SIGKILL = 1
- DISCONNECT = 2
+ DROP_NETWORK = 2
+
+
+@constructible
+class DiscoreryType(IntEnum):
+ """
+ Discovery type.
+ """
+ ZooKeeper = 0
+ TCP = 1
# pylint: disable=W0223
@@ -47,7 +59,8 @@ class CellularAffinity(IgniteTest):
"""
Tests Cellular Affinity scenarios.
"""
- NUM_NODES = 3
+ NODES_PER_CELL = 3
+ ZOOKEPER_CLUSTER_SIZE = 3
ATTRIBUTE = "CELL"
@@ -69,7 +82,7 @@ class CellularAffinity(IgniteTest):
</bean>
</property>
<property name="name" value="{{ cacheName }}"/>
- <property name="backups" value="{{ nodes }}"/>
+ <property name="backups" value="{{ backups }}"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
</bean>
</list>
@@ -82,11 +95,12 @@ class CellularAffinity(IgniteTest):
:return: Configuration properties.
"""
return Template(CellularAffinity.CONFIG_TEMPLATE) \
- .render(nodes=CellularAffinity.NUM_NODES, # bigger than cell
capacity (to handle single cell useless test)
- attr=CellularAffinity.ATTRIBUTE,
- cacheName=CellularAffinity.CACHE_NAME)
+ .render(
+ backups=CellularAffinity.NODES_PER_CELL, # bigger than cell
capacity (to handle single cell useless test)
+ attr=CellularAffinity.ATTRIBUTE,
+ cacheName=CellularAffinity.CACHE_NAME)
- @cluster(num_nodes=NUM_NODES * 3 + 1)
+ @cluster(num_nodes=NODES_PER_CELL * 3 + 1)
@version_if(lambda version: version >= DEV_BRANCH)
@ignite_versions(str(DEV_BRANCH))
def test_distribution(self, ignite_version):
@@ -94,9 +108,15 @@ class CellularAffinity(IgniteTest):
Tests Cellular Affinity scenario (partition distribution).
"""
cell1 = self.start_cell(ignite_version, ['-D' +
CellularAffinity.ATTRIBUTE + '=1'])
- self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE +
'=2'], joined_cluster=cell1)
- self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE +
'=XXX', '-DRANDOM=42'],
- joined_cluster=cell1)
+
+ discovery_spi = from_ignite_cluster(cell1)
+
+ cell2 = self.start_cell(ignite_version, ['-D' +
CellularAffinity.ATTRIBUTE + '=2'], discovery_spi)
+ cell3 = self.start_cell(ignite_version, ['-D' +
CellularAffinity.ATTRIBUTE + '=XXX', '-DRANDOM=42'],
+ discovery_spi)
+
+ for cell in [cell1, cell2, cell3]:
+ cell.await_started()
ControlUtility(cell1, self.test_context).activate()
@@ -106,42 +126,82 @@ class CellularAffinity(IgniteTest):
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.DistributionChecker",
params={"cacheName": CellularAffinity.CACHE_NAME,
"attr": CellularAffinity.ATTRIBUTE,
- "nodesPerCell": self.NUM_NODES})
+ "nodesPerCell": self.NODES_PER_CELL})
checker.run()
+ # pylint: disable=R0912
# pylint: disable=R0914
- @cluster(num_nodes=NUM_NODES * (3 + 1))
+ # pylint: disable=no-member
+ @cluster(num_nodes=2 * (NODES_PER_CELL + 1) + 3) # cell_cnt *
(srv_per_cell + cell_streamer) + zookeper_cluster
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
- @matrix(stop_type=[
- StopType.DISCONNECT,
- StopType.SIGKILL,
- StopType.SIGTERM])
- def test_latency(self, ignite_version, stop_type):
+ @matrix(stop_type=[StopType.DROP_NETWORK, StopType.SIGKILL,
StopType.SIGTERM],
+ discovery_type=[DiscoreryType.ZooKeeper, DiscoreryType.TCP])
+ def test_latency(self, ignite_version, stop_type, discovery_type):
"""
Tests Cellular switch tx latency.
"""
+ cluster_size = len(self.test_context.cluster)
+
+ cells_amount = math.floor((cluster_size - self.ZOOKEPER_CLUSTER_SIZE)
/ (self.NODES_PER_CELL + 1))
+
+ assert cells_amount >= 2
+
+ self.test_context.logger.info(
+ "Cells amount calculated as %d at cluster with %d nodes in total"
% (cells_amount, cluster_size))
+
data = {}
- cell1, prepared_tx_loader1 =
self.start_cell_with_prepared_txs(ignite_version, "C1")
- _, prepared_tx_loader2 =
self.start_cell_with_prepared_txs(ignite_version, "C2", joined_cluster=cell1)
- _, prepared_tx_loader3 =
self.start_cell_with_prepared_txs(ignite_version, "C3", joined_cluster=cell1)
+ discovery_spi = None
+
+ modules = []
+
+ d_type = DiscoreryType.construct_from(discovery_type)
- loaders = [prepared_tx_loader1, prepared_tx_loader2,
prepared_tx_loader3]
+ if d_type is DiscoreryType.ZooKeeper:
+ zk_settings = ZookeeperSettings()
+ zk_quorum = ZookeeperService(self.test_context,
self.ZOOKEPER_CLUSTER_SIZE, settings=zk_settings)
+ zk_quorum.start()
- failed_loader = prepared_tx_loader3
+ modules.append('zookeeper')
- tx_streamer1 = self.start_tx_streamer(ignite_version, "C1",
joined_cluster=cell1)
- tx_streamer2 = self.start_tx_streamer(ignite_version, "C2",
joined_cluster=cell1)
- tx_streamer3 = self.start_tx_streamer(ignite_version, "C3",
joined_cluster=cell1)
+ discovery_spi = from_zookeeper_cluster(zk_quorum)
- streamers = [tx_streamer1, tx_streamer2, tx_streamer3]
+ cell0, prepared_tx_loader1 =
self.start_cell_with_prepared_txs(ignite_version, "C0", discovery_spi, modules)
+
+ if d_type is DiscoreryType.TCP:
+ discovery_spi = from_ignite_cluster(cell0)
+
+ assert discovery_spi is not None
+
+ loaders = [prepared_tx_loader1]
+ nodes = [cell0]
+
+ for cell in range(1, cells_amount):
+ node, prepared_tx_loader = \
+ self.start_cell_with_prepared_txs(ignite_version, "C%d" %
cell, discovery_spi, modules)
+
+ loaders.append(prepared_tx_loader)
+ nodes.append(node)
+
+ failed_loader = loaders[1]
+
+ for node in [*nodes, *loaders]:
+ node.await_started()
+
+ streamers = []
+
+ for cell in range(0, cells_amount):
+ streamers.append(self.start_tx_streamer(ignite_version, "C%d" %
cell, discovery_spi, modules))
for streamer in streamers: # starts tx streaming with latency record
(with some warmup).
- streamer.start()
+ streamer.start_async()
- ControlUtility(cell1,
self.test_context).disable_baseline_auto_adjust() # baseline set.
- ControlUtility(cell1, self.test_context).activate()
+ for streamer in streamers:
+ streamer.await_started()
+
+ ControlUtility(cell0,
self.test_context).disable_baseline_auto_adjust() # baseline set.
+ ControlUtility(cell0, self.test_context).activate()
for loader in loaders:
loader.await_event("ALL_TRANSACTIONS_PREPARED", 180,
from_the_beginning=True)
@@ -150,13 +210,12 @@ class CellularAffinity(IgniteTest):
streamer.await_event("WARMUP_FINISHED", 180,
from_the_beginning=True)
# node left with prepared txs.
- # pylint: disable=no-member
with StopType.construct_from(stop_type) as s_type:
if s_type is StopType.SIGTERM:
failed_loader.stop_async()
elif s_type is StopType.SIGKILL:
- failed_loader.stop(clean_shutdown=False)
- elif s_type is StopType.DISCONNECT:
+ failed_loader.kill()
+ elif s_type is StopType.DROP_NETWORK:
failed_loader.drop_network()
for streamer in streamers:
@@ -176,64 +235,63 @@ class CellularAffinity(IgniteTest):
cell = streamer.params["cell"]
- data["[%s cell %s]" % ("alive" if cell is not
failed_loader.params["cell"] else "broken", cell)] = \
+ data["[%s cell %s]" % ("alive" if cell !=
failed_loader.params["cell"] else "broken", cell)] = \
"worst_latency=%s, tx_streamed=%s, measure_duration=%s" % (
streamer.extract_result("WORST_LATENCY"),
streamer.extract_result("STREAMED"),
streamer.extract_result("MEASURE_DURATION"))
return data
- def start_tx_streamer(self, version, cell, joined_cluster):
+ def start_tx_streamer(self, version, cell, discovery_spi, modules):
"""
Starts transaction streamer.
"""
return IgniteApplicationService(
self.test_context,
IgniteClientConfiguration(version=IgniteVersion(version),
properties=self.properties(),
-
discovery_spi=from_ignite_cluster(joined_cluster)),
+ discovery_spi=discovery_spi),
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularTxStreamer",
params={"cacheName": CellularAffinity.CACHE_NAME,
"attr": CellularAffinity.ATTRIBUTE,
"cell": cell,
"warmup": 10000},
- timeout_sec=180)
+ modules=modules, startup_timeout_sec=180)
- def start_cell_with_prepared_txs(self, version, cell_id,
joined_cluster=None):
+ def start_cell_with_prepared_txs(self, version, cell_id, discovery_spi,
modules):
"""
Starts cell with prepared transactions.
"""
- nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE +
'=' + cell_id],
- CellularAffinity.NUM_NODES - 1, joined_cluster)
+ nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE +
'=' + cell_id], discovery_spi, modules,
+ CellularAffinity.NODES_PER_CELL - 1)
prepared_tx_streamer = IgniteApplicationService( # last server node
at the cell.
self.test_context,
IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties(),
- discovery_spi=from_ignite_cluster(nodes)), #
Server node.
+ discovery_spi=from_ignite_cluster(nodes) if
discovery_spi is None else discovery_spi),
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test."
"CellularPreparedTxStreamer",
params={"cacheName": CellularAffinity.CACHE_NAME,
"attr": CellularAffinity.ATTRIBUTE,
"cell": cell_id,
"txCnt": CellularAffinity.PREPARED_TX_CNT},
- jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id],
- timeout_sec=180)
+ jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id],
modules=modules, startup_timeout_sec=180)
- prepared_tx_streamer.start() # starts last server node and creates
prepared txs on it.
+ prepared_tx_streamer.start_async() # starts last server node and
creates prepared txs on it.
return nodes, prepared_tx_streamer
- def start_cell(self, version, jvm_opts, nodes_cnt=NUM_NODES,
joined_cluster=None):
+ # pylint: disable=R0913
+ def start_cell(self, version, jvm_opts, discovery_spi=None, modules=None,
nodes_cnt=NODES_PER_CELL):
"""
Starts cell.
"""
- config = IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties(),
- cluster_state="INACTIVE")
-
- if joined_cluster:
- config =
config._replace(discovery_spi=from_ignite_cluster(joined_cluster))
-
- ignites = IgniteService(self.test_context, config,
num_nodes=nodes_cnt, jvm_opts=jvm_opts)
+ ignites = IgniteService(
+ self.test_context,
+ IgniteConfiguration(version=IgniteVersion(version),
properties=self.properties(),
+ cluster_state="INACTIVE",
+ discovery_spi=TcpDiscoverySpi() if
discovery_spi is None else discovery_spi),
+ num_nodes=nodes_cnt, modules=modules, jvm_opts=jvm_opts,
startup_timeout_sec=180)
- ignites.start()
+ ignites.start_async()
return ignites
diff --git a/modules/ducktests/tests/ignitetest/tests/client_test.py
b/modules/ducktests/tests/ignitetest/tests/client_test.py
index c6ee4ed..2da41d5 100644
--- a/modules/ducktests/tests/ignitetest/tests/client_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/client_test.py
@@ -74,7 +74,7 @@ class ClientTest(IgniteTest):
# pylint: disable=R0914
# pylint: disable=R0913
- def ignite_start_stop(self, ignite_version, correct_stop_temp_node,
nodes_num, static_clients_num, temp_client,
+ def ignite_start_stop(self, ignite_version, graceful_shutdown, nodes_num,
static_clients_num, temp_client,
iteration_count, client_work_time):
"""
Test for starting and stopping fat clients.
@@ -128,7 +128,10 @@ class ClientTest(IgniteTest):
time.sleep(client_work_time)
- temp_clients.stop(correct_stop_temp_node)
+ if graceful_shutdown:
+ temp_clients.stop()
+ else:
+ temp_clients.kill()
current_top_v += temp_client
diff --git
a/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
index 710e317..6a38f7c 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
@@ -199,8 +199,8 @@ class BaselineTests(IgniteTest):
if join_cluster:
config._replace(discovery_spi=from_ignite_cluster(join_cluster))
- servers = IgniteService(self.test_context, config=config,
num_nodes=num_nodes)
+ servers = IgniteService(self.test_context, config=config,
num_nodes=num_nodes, startup_timeout_sec=timeout_sec)
- servers.start(timeout_sec=timeout_sec)
+ servers.start()
return servers
diff --git
a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
index fef01b9..0ea9ce9 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
@@ -166,9 +166,9 @@ class TransactionsTests(IgniteTest):
caches=[CacheConfiguration(name=self.CACHE_NAME,
atomicity_mode='TRANSACTIONAL')]
)
- servers = IgniteService(self.test_context, config=config,
num_nodes=num_nodes)
+ servers = IgniteService(self.test_context, config=config,
num_nodes=num_nodes, startup_timeout_sec=timeout_sec)
- servers.start(timeout_sec=timeout_sec)
+ servers.start()
return servers
diff --git a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
index c182c61..08a43bf 100644
--- a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
@@ -18,6 +18,7 @@ This module contains PME free switch tests.
"""
import time
+from enum import IntEnum
from ducktape.mark import matrix
@@ -28,22 +29,34 @@ from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration
from ignitetest.services.utils.ignite_configuration.cache import
CacheConfiguration
from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster
from ignitetest.utils import ignite_versions, cluster
+from ignitetest.utils.enum import constructible
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST_2_7, V_2_8_0,
IgniteVersion
+@constructible
+class LoadType(IntEnum):
+ """
+ Load type.
+ """
+ NONE = 0
+ EXTRA_CACHES = 1
+ LONG_TXS = 2
+
+
# pylint: disable=W0223
+# pylint: disable=no-member
class PmeFreeSwitchTest(IgniteTest):
"""
Tests PME free switch scenarios.
"""
NUM_NODES = 9
- CACHES_AMOUNT = 100
+ EXTRA_CACHES_AMOUNT = 100
@cluster(num_nodes=NUM_NODES + 2)
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_7))
- @matrix(long_txs=[False, True])
- def test(self, ignite_version, long_txs):
+ @matrix(load_type=[LoadType.NONE, LoadType.EXTRA_CACHES,
LoadType.LONG_TXS])
+ def test(self, ignite_version, load_type):
"""
Tests PME-free switch scenario (node stop).
"""
@@ -51,16 +64,21 @@ class PmeFreeSwitchTest(IgniteTest):
caches = [CacheConfiguration(name='test-cache', backups=2,
atomicity_mode='TRANSACTIONAL')]
+ l_type = LoadType.construct_from(load_type)
+
# Checking PME (before 2.8) vs PME-free (2.8+) switch duration, but
# focusing on switch duration (which depends on caches amount) when
long_txs is false and
# on waiting for previously started txs before the switch (which
depends on txs duration) when long_txs of true.
- if not long_txs:
- for idx in range(1, self.CACHES_AMOUNT):
+ if l_type is LoadType.EXTRA_CACHES:
+ for idx in range(1, self.EXTRA_CACHES_AMOUNT):
caches.append(CacheConfiguration(name="cache-%d" % idx,
backups=2, atomicity_mode='TRANSACTIONAL'))
config = IgniteConfiguration(version=IgniteVersion(ignite_version),
caches=caches, cluster_state="INACTIVE")
num_nodes = len(self.test_context.cluster) - 2
+
+ self.test_context.logger.info("Nodes amount calculated as %d." %
num_nodes)
+
ignites = IgniteService(self.test_context, config, num_nodes=num_nodes)
ignites.start()
@@ -78,9 +96,9 @@ class PmeFreeSwitchTest(IgniteTest):
client_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.pme_free_switch_test.LongTxStreamerApplication",
params={"cacheName": "test-cache"},
- timeout_sec=180)
+ startup_timeout_sec=180)
- if long_txs:
+ if l_type is LoadType.LONG_TXS:
long_tx_streamer.start()
single_key_tx_streamer = IgniteApplicationService(
@@ -89,18 +107,20 @@ class PmeFreeSwitchTest(IgniteTest):
java_class_name="org.apache.ignite.internal.ducktest.tests.pme_free_switch_test."
"SingleKeyTxStreamerApplication",
params={"cacheName": "test-cache", "warmup": 1000},
- timeout_sec=180)
+ startup_timeout_sec=180)
single_key_tx_streamer.start()
ignites.stop_node(ignites.nodes[num_nodes - 1])
- if long_txs:
- long_tx_streamer.await_event("Node left topology", 60,
from_the_beginning=True)
+ single_key_tx_streamer.await_event("Node left topology", 60,
from_the_beginning=True)
+ if l_type is LoadType.LONG_TXS:
time.sleep(30) # keeping txs alive for 30 seconds.
- long_tx_streamer.stop()
+ long_tx_streamer.stop_async()
+
+ single_key_tx_streamer.await_event("Node left topology", 60,
from_the_beginning=True)
single_key_tx_streamer.await_event("APPLICATION_STREAMED", 60) #
waiting for streaming continuation.
diff --git a/modules/ducktests/tests/ignitetest/tests/self_test.py
b/modules/ducktests/tests/ignitetest/tests/self_test.py
index fbeda05..e5f91e9 100644
--- a/modules/ducktests/tests/ignitetest/tests/self_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/self_test.py
@@ -75,14 +75,14 @@ class SelfTest(IgniteTest):
IgniteClientConfiguration(version=IgniteVersion(ignite_version),
discovery_spi=from_ignite_cluster(ignites)),
java_class_name="org.apache.ignite.internal.ducktest.tests.self_test.TestKillableApplication",
- timeout_sec=180)
+ startup_timeout_sec=180)
node2 = IgniteApplicationService(
self.test_context,
IgniteClientConfiguration(version=IgniteVersion(ignite_version),
discovery_spi=from_ignite_cluster(ignites)),
java_class_name="org.apache.ignite.internal.ducktest.tests.self_test.TestSelfKillableApplication",
- timeout_sec=180)
+ startup_timeout_sec=180)
node1.start()
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index feb5993..677141c 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -18,6 +18,7 @@ This module contains basic ignite test.
"""
from time import monotonic
+from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.tests.test import Test
@@ -40,3 +41,17 @@ class IgniteTest(Test):
so that only the difference between the results of consecutive
calls is valid.
"""
return monotonic()
+
+ # pylint: disable=W0212
+ def tearDown(self):
+ self.logger.debug("Killing all services to speed-up the tearing down.")
+
+ for service in self.test_context.services._services.values():
+ try:
+ service.kill()
+ except RemoteCommandError:
+ pass # Process may be already self-killed on segmentation.
+
+ self.logger.debug("All services killed.")
+
+ super().tearDown()