This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push:
new 8e9350e Ducktests iptables (#8211)
8e9350e is described below
commit 8e9350e676ac7ab9a631cba12c28ff69ff2ed281
Author: Vladsz83 <[email protected]>
AuthorDate: Wed Oct 21 10:09:55 2020 +0300
Ducktests iptables (#8211)
---
.../ducktests/tests/ignitetest/services/ignite.py | 58 +++--
.../ignitetest/services/utils/ignite_aware.py | 3 +-
.../utils/ignite_configuration/__init__.py | 3 +
.../utils/ignite_configuration/communication.py} | 41 +--
.../utils/ignite_configuration/discovery.py | 11 +-
.../templates/communication_macro.j2} | 16 +-
.../services/utils/templates/discovery_macro.j2 | 32 ++-
.../services/utils/templates/ignite.xml.j2 | 4 +
.../services/zk/templates/zookeeper.properties.j2 | 1 +
.../tests/ignitetest/services/zk/zookeeper.py | 13 +-
.../tests/ignitetest/tests/discovery_test.py | 287 +++++++++++++++------
.../tests/ignitetest/utils/ignite_test.py | 26 ++
12 files changed, 349 insertions(+), 146 deletions(-)
diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py
b/modules/ducktests/tests/ignitetest/services/ignite.py
index f50365d..ad88db2 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,9 +17,8 @@
This module contains class to start ignite cluster node.
"""
-import functools
-import operator
import os
+import re
import signal
import time
from datetime import datetime
@@ -70,7 +69,7 @@ class IgniteService(IgniteAwareService):
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
- self.__stop_node(node, pid, sig)
+ node.account.signal(pid, sig, allow_fail=False)
try:
wait_until(lambda: len(self.pids(node)) == 0,
timeout_sec=timeout_sec,
@@ -79,21 +78,22 @@ class IgniteService(IgniteAwareService):
self.thread_dump(node)
raise
- def stop_nodes_async(self, nodes, delay_ms=0, clean_shutdown=True,
timeout_sec=20, wait_for_stop=False):
+ def exec_on_nodes_async(self, nodes, task, simultaneously=True,
delay_ms=0, timeout_sec=20):
"""
- Stops the nodes asynchronously.
+ Executes given task on the nodes.
+ :param task: a 'lambda: node'.
+ :param simultaneously: Enables or disables simultaneous start of the
task on each node.
+ :param delay_ms: delay before task run. Begins with 0, grows by
delay_ms for each next node in nodes.
+ :param timeout_sec: timeout to wait the task.
"""
- sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
- sem = CountDownLatch(len(nodes))
+ sem = CountDownLatch(len(nodes)) if simultaneously else None
time_holder = AtomicValue()
delay = 0
threads = []
for node in nodes:
- thread = Thread(target=self.__stop_node,
- args=(node, next(iter(self.pids(node))), sig, sem,
delay, time_holder))
+ thread = Thread(target=self.__exec_on_node, args=(node, task, sem,
delay, time_holder))
threads.append(thread)
@@ -104,19 +104,10 @@ class IgniteService(IgniteAwareService):
for thread in threads:
thread.join(timeout_sec)
- if wait_for_stop:
- try:
- wait_until(lambda: len(functools.reduce(operator.iconcat,
(self.pids(n) for n in nodes), [])) == 0,
- timeout_sec=timeout_sec, err_msg="Ignite node
failed to stop in %d seconds" % timeout_sec)
- except Exception:
- for node in nodes:
- self.thread_dump(node)
- raise
-
return time_holder.get()
@staticmethod
- def __stop_node(node, pid, sig, start_waiter=None, delay_ms=0,
time_holder=None):
+ def __exec_on_node(node, task, start_waiter=None, delay_ms=0,
time_holder=None):
if start_waiter:
start_waiter.count_down()
start_waiter.wait()
@@ -130,7 +121,7 @@ class IgniteService(IgniteAwareService):
time_holder.compare_and_set(None, (mono, timestamp))
- node.account.signal(pid, sig, False)
+ task(node)
def clean_node(self, node):
node.account.kill_java_processes(self.APP_SERVICE_CLASS,
clean_shutdown=False, allow_fail=True)
@@ -154,3 +145,28 @@ class IgniteService(IgniteAwareService):
return pid_arr
except (RemoteCommandError, ValueError):
return []
+
+
+def node_failed_event_pattern(failed_node_id=None):
+ """Failed node pattern in log."""
+ return "Node FAILED: .\\{1,\\}Node \\[id=" + (failed_node_id if
failed_node_id else "") + \
+ ".\\{1,\\}\\(isClient\\|client\\)=false"
+
+
+def get_event_time(service, log_node, log_pattern, from_the_beginning=True,
timeout=15):
+ """
+ Extracts event time from ignite log by pattern .
+ :param service: ducktape service (ignite service) responsible to search
log.
+ :param log_node: ducktape node to search ignite log on.
+ :param log_pattern: pattern to search ignite log for.
+ :param from_the_beginning: switches searching log from its beginning.
+ :param timeout: timeout to wait for the patters in the log.
+ """
+ service.await_event_on_node(log_pattern, log_node, timeout,
from_the_beginning=from_the_beginning,
+ backoff_sec=0.3)
+
+ _, stdout, _ = log_node.account.ssh_client.exec_command(
+ "grep '%s' %s" % (log_pattern,
IgniteAwareService.STDOUT_STDERR_CAPTURE))
+
+ return datetime.strptime(re.match("^\\[[^\\[]+\\]",
stdout.read().decode("utf-8")).group(),
+ "[%Y-%m-%d %H:%M:%S,%f]")
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index fadbbe6..c964b69 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -122,7 +122,8 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePersistenceAware, metacl
monitor.offset = 0
monitor.wait_until(evt_message, timeout_sec=timeout_sec,
backoff_sec=backoff_sec,
- err_msg="Event [%s] was not triggered in %d
seconds" % (evt_message, timeout_sec))
+ err_msg="Event [%s] was not triggered on '%s'
in %d seconds" % (evt_message, node.name,
+
timeout_sec))
def await_event(self, evt_message, timeout_sec, from_the_beginning=False,
backoff_sec=5):
"""
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index b9602e4..5d86a22 100644
---
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -19,6 +19,7 @@ This module contains IgniteConfiguration classes and
utilities.
from typing import NamedTuple
+from ignitetest.services.utils.ignite_configuration.communication import
CommunicationSpi, TcpCommunicationSpi
from ignitetest.services.utils.ignite_configuration.data_storage import
DataStorageConfiguration
from ignitetest.services.utils.ignite_configuration.discovery import
DiscoverySpi, TcpDiscoverySpi
from ignitetest.utils.version import IgniteVersion, DEV_BRANCH
@@ -29,11 +30,13 @@ class IgniteConfiguration(NamedTuple):
Ignite configuration.
"""
discovery_spi: DiscoverySpi = TcpDiscoverySpi()
+ communication_spi: CommunicationSpi = TcpCommunicationSpi()
version: IgniteVersion = DEV_BRANCH
cluster_state: str = 'ACTIVE'
client_mode: bool = False
consistent_id: str = None
failure_detection_timeout: int = 10000
+ sys_worker_blocked_timeout: int = 10000
properties: str = None
data_storage: DataStorageConfiguration = None
caches: list = []
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
similarity index 52%
copy from modules/ducktests/tests/ignitetest/utils/ignite_test.py
copy to
modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
index feb5993..edca32c 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
@@ -11,32 +11,35 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License
"""
-This module contains basic ignite test.
+Module contains classes and utility methods to create communication
configuration for ignite nodes.
"""
-from time import monotonic
-from ducktape.tests.test import Test
+from abc import ABCMeta, abstractmethod
-# pylint: disable=W0223
-class IgniteTest(Test):
+class CommunicationSpi(metaclass=ABCMeta):
"""
- Basic ignite test.
+ Abstract class for CommunicationSpi.
"""
- def __init__(self, test_context):
- super().__init__(test_context=test_context)
-
- @staticmethod
- def monotonic():
+ @property
+ @abstractmethod
+ def type(self):
"""
- monotonic() -> float
-
- :return:
- The value (in fractional seconds) of a monotonic clock, i.e. a
clock that cannot go backwards.
- The clock is not affected by system clock updates. The reference
point of the returned value is undefined,
- so that only the difference between the results of consecutive
calls is valid.
+ Type of CommunicationSpi.
"""
- return monotonic()
+
+
+class TcpCommunicationSpi(CommunicationSpi):
+ """
+ TcpCommunicationSpi.
+ """
+ def __init__(self, port=47100, port_range=100):
+ self.port = port
+ self.port_range = port_range
+
+ @property
+ def type(self):
+ return "TCP"
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
index 9f38da5..2f53361 100644
---
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
+++
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
@@ -45,8 +45,9 @@ class ZookeeperDiscoverySpi(DiscoverySpi):
"""
ZookeeperDiscoverySpi.
"""
- def __init__(self, connection_string, root_path):
- self.connection_string = connection_string
+ def __init__(self, zoo_service, root_path):
+ self.connection_string = zoo_service.connection_string()
+ self.port = zoo_service.settings.client_port
self.root_path = root_path
@property
@@ -100,8 +101,10 @@ class TcpDiscoverySpi(DiscoverySpi):
"""
TcpDiscoverySpi.
"""
- def __init__(self, ip_finder=TcpDiscoveryVmIpFinder()):
+ def __init__(self, ip_finder=TcpDiscoveryVmIpFinder(), port=47500,
port_range=100):
self.ip_finder = ip_finder
+ self.port = port
+ self.port_range = port_range
@property
def type(self):
@@ -138,4 +141,4 @@ def from_zookeeper_cluster(cluster,
root_path="/apacheIgnite"):
"""
assert isinstance(cluster, ZookeeperService)
- return ZookeeperDiscoverySpi(cluster.connection_string(),
root_path=root_path)
+ return ZookeeperDiscoverySpi(cluster, root_path=root_path)
diff --git
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
b/modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
similarity index 69%
copy from
modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
copy to
modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
index 2c80e84..1f5c17c 100644
---
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
+++
b/modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
@@ -15,11 +15,11 @@
limitations under the License.
#}
-tickTime={{ settings.tick_time }}
-initLimit={{ settings.init_limit }}
-syncLimit={{ settings.sync_limit }}
-dataDir={{ DATA_DIR }}
-clientPort={{ settings.client_port }}
-{% for node in nodes %}
-server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
-{% endfor %}
+{% macro communication_spi(spi) %}
+ <property name="communicationSpi">
+ <bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
+ <property name="localPort" value="{{ spi.port }}"/>
+ <property name="localPortRange" value="{{ spi.port_range }}"/>
+ </bean>
+ </property>
+{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
index fdd632a..66bdf43 100644
---
a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
+++
b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
@@ -15,20 +15,22 @@
limitations under the License.
#}
-{% macro ip_finder(finder) %}
- {% if finder %}
+{% macro ip_finder(spi) %}
+ {% if spi.ip_finder and spi.ip_finder.type == 'VM' %}
<property name="ipFinder">
- {% if finder.type == 'VM' %}
- <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
- <property name="addresses">
- <list>
- {% for address in finder.addresses %}
- <value>{{ address }}</value>
- {% endfor %}
- </list>
- </property>
- </bean>
- {% endif %}
+ <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ {% for address in spi.ip_finder.addresses %}
+ {% if spi.port_range > 0 %}
+ <value>{{ address }}:{{ spi.port }}..{{
spi.port + spi.port_range }}</value>
+ {% else %}
+ <value>{{ address }}:{{ spi.port }}</value>
+ {% endif %}
+ {% endfor %}
+ </list>
+ </property>
+ </bean>
</property>
{% endif %}
{% endmacro %}
@@ -42,7 +44,9 @@
{% macro tcp_discovery_spi(spi) %}
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
- {{ ip_finder(spi.ip_finder) }}
+ <property name="localPort" value="{{ spi.port }}"/>
+ <property name="localPortRange" value="{{ spi.port_range }}"/>
+ {{ ip_finder(spi) }}
</bean>
{% endmacro %}
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index 86abd35..798897a 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -17,6 +17,7 @@
limitations under the License.
#}
+{% import 'communication_macro.j2' as communication %}
{% import 'discovery_macro.j2' as disco_utils %}
{% import 'cache_macro.j2' as cache_utils %}
{% import 'datastorage_macro.j2' as datastorage_utils %}
@@ -37,9 +38,12 @@
<property name="clientMode" value="{{ config.client_mode or False |
lower }}"/>
<property name="consistentId" value="{{ config.consistent_id }}"/>
<property name="failureDetectionTimeout" value="{{
config.failure_detection_timeout }}"/>
+ <property name="systemWorkerBlockedTimeout" value="{{
config.sys_worker_blocked_timeout }}"/>
{{ misc_utils.cluster_state(config.cluster_state, config.version) }}
+ {{ communication.communication_spi(config.communication_spi) }}
+
{{ disco_utils.discovery_spi(config.discovery_spi) }}
{{ datastorage_utils.data_storage(config.data_storage) }}
diff --git
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
b/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
index 2c80e84..441b0db 100644
---
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
+++
b/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
@@ -16,6 +16,7 @@
#}
tickTime={{ settings.tick_time }}
+minSessionTimeout={{ settings.min_session_timeout }}
initLimit={{ settings.init_limit }}
syncLimit={{ settings.sync_limit }}
dataDir={{ DATA_DIR }}
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index b3da642..ca8a7fb 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -27,11 +27,14 @@ class ZookeeperSettings:
"""
Settings for zookeeper quorum nodes.
"""
- def __init__(self, tick_time=1000, init_limit=10, sync_limit=5,
client_port=2181):
- self.tick_time = tick_time
- self.init_limit = init_limit
- self.sync_limit = sync_limit
- self.client_port = client_port
+ def __init__(self, **kwargs):
+ self.tick_time = kwargs.get('tick_time', 1000)
+ self.min_session_timeout = kwargs.get('min_session_timeout', 2000)
+ self.init_limit = kwargs.get('init_limit', 10)
+ self.sync_limit = kwargs.get('sync_limit', 5)
+ self.client_port = kwargs.get('client_port', 2181)
+
+ assert self.tick_time <= self.min_session_timeout // 2, "'tick_time'
must be <= 'min_session_timeout' / 2"
class ZookeeperService(Service):
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 2c8d6a1..f2be35b 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -17,25 +17,24 @@
Module contains discovery tests.
"""
+import os
import random
-import re
+import sys
from enum import IntEnum
-from datetime import datetime
from time import monotonic
from typing import NamedTuple
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
-from ignitetest.services.ignite import IgniteAwareService
-from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite import IgniteAwareService, IgniteService,
get_event_time, node_failed_event_pattern
from ignitetest.services.ignite_app import IgniteApplicationService
from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
from ignitetest.services.utils.ignite_configuration.cache import
CacheConfiguration
from ignitetest.services.utils.ignite_configuration.discovery import
from_zookeeper_cluster, from_ignite_cluster, \
TcpDiscoverySpi
from ignitetest.services.utils.time_utils import epoch_mills
-from ignitetest.services.zk.zookeeper import ZookeeperService
+from ignitetest.services.zk.zookeeper import ZookeeperService,
ZookeeperSettings
from ignitetest.utils import ignite_versions, version_if
from ignitetest.utils.ignite_test import IgniteTest
from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, V_2_8_0,
IgniteVersion
@@ -56,8 +55,8 @@ class DiscoveryTestConfig(NamedTuple):
"""
version: IgniteVersion
nodes_to_kill: int = 1
- kill_coordinator: bool = False
load_type: ClusterLoad = ClusterLoad.NONE
+ sequential_failure: bool = False
with_zk: bool = False
@@ -69,46 +68,76 @@ class DiscoveryTest(IgniteTest):
2. Kill random node.
3. Wait that survived node detects node failure.
"""
- NUM_NODES = 7
+ NUM_NODES = 9
- FAILURE_DETECTION_TIMEOUT = 2000
+ FAILURE_DETECTION_TIMEOUT_TCP = 1000
+
+ FAILURE_DETECTION_TIMEOUT_ZK = 3000
DATA_AMOUNT = 5_000_000
WARMUP_DATA_AMOUNT = 10_000
+ def __init__(self, test_context):
+ super().__init__(test_context=test_context)
+
+ self.netfilter_store_path = None
+
@cluster(num_nodes=NUM_NODES)
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
- @matrix(kill_coordinator=[False, True],
- nodes_to_kill=[1, 2],
+ @matrix(nodes_to_kill=[1, 2],
load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC,
ClusterLoad.TRANSACTIONAL])
- def test_node_fail_tcp(self, ignite_version, kill_coordinator,
nodes_to_kill, load_type):
+ def test_nodes_fail_not_sequential_tcp(self, ignite_version,
nodes_to_kill, load_type):
+ """
+ Test nodes failure scenario with TcpDiscoverySpi not allowing nodes to
fail in a row.
+ """
+ test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version),
nodes_to_kill=nodes_to_kill,
+ load_type=load_type,
sequential_failure=False)
+
+ return self._perform_node_fail_scenario(test_config)
+
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ @matrix(load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC,
ClusterLoad.TRANSACTIONAL])
+ def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type):
"""
- Test nodes failure scenario with TcpDiscoverySpi.
- :param load_type: How to load cluster during the test: 0 - no loading;
1 - do some loading; 2 - transactional.
+ Test 2 nodes sequential failure scenario with TcpDiscoverySpi.
"""
- test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version),
kill_coordinator=kill_coordinator,
- nodes_to_kill=nodes_to_kill,
load_type=load_type, with_zk=False)
+ test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
load_type=load_type,
+ sequential_failure=True)
return self._perform_node_fail_scenario(test_config)
@cluster(num_nodes=NUM_NODES + 3)
@version_if(lambda version: version != V_2_8_0) # ignite-zookeeper
package is broken in 2.8.0
@ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
- @matrix(kill_coordinator=[False, True],
- nodes_to_kill=[1, 2],
+ @matrix(nodes_to_kill=[1, 2],
load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC,
ClusterLoad.TRANSACTIONAL])
- def test_node_fail_zk(self, ignite_version, kill_coordinator,
nodes_to_kill, load_type):
+ def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill,
load_type):
+ """
+ Test node failure scenario with ZooKeeperSpi not allowing nodes to
fail in a row.
+ """
+ test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version),
nodes_to_kill=nodes_to_kill,
+ load_type=load_type,
sequential_failure=False, with_zk=True)
+
+ return self._perform_node_fail_scenario(test_config)
+
+ @cluster(num_nodes=NUM_NODES + 3)
+ @version_if(lambda version: version != V_2_8_0) # ignite-zookeeper
package is broken in 2.8.0
+ @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+ @matrix(load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC,
ClusterLoad.TRANSACTIONAL])
+ def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type):
"""
- Test node failure scenario with ZooKeeperSpi.
- :param load_type: How to load cluster during the test: 0 - no loading;
1 - do some loading; 2 - transactional.
+ Test node failure scenario with ZooKeeperSpi not allowing to fail
nodes in a row.
"""
- test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version),
kill_coordinator=kill_coordinator,
- nodes_to_kill=nodes_to_kill,
load_type=load_type, with_zk=True)
+ test_config =
DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2,
load_type=load_type,
+ sequential_failure=True,
with_zk=True)
return self._perform_node_fail_scenario(test_config)
def _perform_node_fail_scenario(self, test_config):
+ results = {}
+
modules = ['zookeeper'] if test_config.with_zk else None
if test_config.with_zk:
@@ -121,7 +150,8 @@ class DiscoveryTest(IgniteTest):
ignite_config = IgniteConfiguration(
version=test_config.version,
discovery_spi=discovery_spi,
- failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT,
+ failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT_ZK if
test_config.with_zk
+ else self.FAILURE_DETECTION_TIMEOUT_TCP,
caches=[CacheConfiguration(
name='test-cache',
backups=1,
@@ -131,14 +161,16 @@ class DiscoveryTest(IgniteTest):
servers, start_servers_sec = start_servers(self.test_context,
self.NUM_NODES - 1, ignite_config, modules)
- failed_nodes, survived_node = choose_node_to_kill(servers,
test_config.kill_coordinator,
-
test_config.nodes_to_kill)
+ results['Ignite cluster start time (s)'] = start_servers_sec
+
+ failed_nodes, survived_node = choose_node_to_kill(servers,
test_config.nodes_to_kill,
+
test_config.sequential_failure)
if test_config.load_type is not ClusterLoad.NONE:
load_config = ignite_config._replace(client_mode=True) if
test_config.with_zk else \
ignite_config._replace(client_mode=True,
discovery_spi=from_ignite_cluster(servers))
- tran_nodes = [n.discovery_info().node_id for n in failed_nodes] \
+ tran_nodes = [node_id(n) for n in failed_nodes] \
if test_config.load_type == ClusterLoad.TRANSACTIONAL else None
params = {"cacheName": "test-cache",
@@ -149,18 +181,128 @@ class DiscoveryTest(IgniteTest):
start_load_app(self.test_context, ignite_config=load_config,
params=params, modules=modules)
- data = simulate_nodes_failure(servers, failed_nodes, survived_node)
+ results.update(self._simulate_nodes_failure(servers,
node_fail_task(ignite_config, test_config), failed_nodes,
+ survived_node))
+
+ return results
+
+ def _simulate_nodes_failure(self, servers, kill_node_task, failed_nodes,
survived_node):
+ """
+ Perform node failure scenario
+ """
+ for node in failed_nodes:
+ self.logger.info(
+ "Simulating failure of node '%s' (order %d) on '%s'" %
(node_id(node), order(node), node.name))
+
+ ids_to_wait = [node_id(n) for n in failed_nodes]
+
+ _, first_terminated = servers.exec_on_nodes_async(failed_nodes,
kill_node_task)
+
+ for node in failed_nodes:
+ self.logger.debug(
+ "Netfilter activated on '%s': %s" % (node.name,
dump_netfilter_settings(node)))
- data['Ignite cluster start time (s)'] = start_servers_sec
+ # Keeps dates of logged node failures.
+ logged_timestamps = []
+ data = {}
+
+ for failed_id in ids_to_wait:
+ logged_timestamps.append(
+ get_event_time(servers, survived_node,
node_failed_event_pattern(failed_id)))
+
+ self._check_failed_number(failed_nodes, survived_node)
+ self._check_not_segmented(failed_nodes)
+
+ logged_timestamps.sort(reverse=True)
+
+ first_kill_time = epoch_mills(first_terminated)
+ detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+
+ data['Detection of node(s) failure (ms)'] = detection_delay
+ data['All detection delays (ms):'] = str([epoch_mills(ts) -
first_kill_time for ts in logged_timestamps])
+ data['Nodes failed'] = len(failed_nodes)
return data
+ def _check_failed_number(self, failed_nodes, survived_node):
+ """Ensures number of failed nodes is correct."""
+ cmd = "grep '%s' %s | wc -l" % (node_failed_event_pattern(),
IgniteAwareService.STDOUT_STDERR_CAPTURE)
+
+ failed_cnt =
int(str(survived_node.account.ssh_client.exec_command(cmd)[1].read(),
sys.getdefaultencoding()))
+
+ if failed_cnt != len(failed_nodes):
+ failed = str(survived_node.account.ssh_client.exec_command(
+ "grep '%s' %s" % (node_failed_event_pattern(),
IgniteAwareService.STDOUT_STDERR_CAPTURE))[1].read(),
+ sys.getdefaultencoding())
+
+ self.logger.warn("Node '%s' (%s) has detected the following
failures:%s%s" % (
+ survived_node.name, node_id(survived_node), os.linesep,
failed))
+
+ raise AssertionError(
+ "Wrong number of failed nodes: %d. Expected: %d. Check the
logs." % (failed_cnt, len(failed_nodes)))
+
+ def _check_not_segmented(self, failed_nodes):
+ """Ensures only target nodes failed"""
+ for service in [srv for srv in self.test_context.services if
isinstance(srv, IgniteAwareService)]:
+ for node in [srv_node for srv_node in service.nodes if srv_node
not in failed_nodes]:
+ cmd = "grep -i '%s' %s | wc -l" % ("local node segmented",
IgniteAwareService.STDOUT_STDERR_CAPTURE)
+
+ failed =
str(node.account.ssh_client.exec_command(cmd)[1].read(),
sys.getdefaultencoding())
+
+ if int(failed) > 0:
+ raise AssertionError(
+ "Wrong node failed (segmented) on '%s'. Check the
logs." % node.name)
+
+ def setup(self):
+ super().setup()
+
+ self.netfilter_store_path = os.path.join(self.tmp_path_root,
"iptables.bak")
+
+ # Store current network filter settings.
+ for node in self.test_context.cluster.nodes:
+ cmd = "sudo iptables-save | tee " + self.netfilter_store_path
+
+ exec_error =
str(node.account.ssh_client.exec_command(cmd)[2].read(),
sys.getdefaultencoding())
+
+ if "Warning: iptables-legacy tables present" in exec_error:
+ cmd = "sudo iptables-legacy-save | tee " +
self.netfilter_store_path
+
+ exec_error =
str(node.account.ssh_client.exec_command(cmd)[2].read(),
sys.getdefaultencoding())
+
+ assert len(exec_error) == 0, "Failed to store iptables rules on
'%s': %s" % (node.name, exec_error)
+
+ self.logger.debug("Netfilter before launch on '%s': %s" %
(node.name, dump_netfilter_settings(node)))
+
+ def teardown(self):
+ # Restore previous network filter settings.
+ cmd = "sudo iptables-restore < " + self.netfilter_store_path
+
+ errors = []
+
+ for node in self.test_context.cluster.nodes:
+ exec_error =
str(node.account.ssh_client.exec_command(cmd)[2].read(),
sys.getdefaultencoding())
+
+ if len(exec_error) > 0:
+ errors.append("Failed to restore iptables rules on '%s': %s" %
(node.name, exec_error))
+ else:
+ self.logger.debug("Netfilter after launch on '%s': %s" %
(node.name, dump_netfilter_settings(node)))
+
+ if len(errors) > 0:
+ self.logger.error("Failed restoring actions:" + os.linesep +
os.linesep.join(errors))
+
+ raise RuntimeError("Unable to restore node states. See the log
above.")
+
+ super().teardown()
+
def start_zookeeper(test_context, num_nodes):
"""
Start zookeeper cluster.
"""
- zk_quorum = ZookeeperService(test_context, num_nodes)
+ zk_settings =
ZookeeperSettings(min_session_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT_ZK,
+
tick_time=DiscoveryTest.FAILURE_DETECTION_TIMEOUT_ZK // 3)
+
+ zk_quorum = ZookeeperService(test_context, num_nodes, settings=zk_settings)
zk_quorum.start()
return zk_quorum
@@ -182,76 +324,73 @@ def start_load_app(test_context, ignite_config, params,
modules=None):
"""
Start loader application.
"""
- loader = IgniteApplicationService(
+ IgniteApplicationService(
test_context,
config=ignite_config,
java_class_name="org.apache.ignite.internal.ducktest.tests.ContinuousDataLoadApplication",
modules=modules,
# mute spam in log.
jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"],
- params=params)
+ params=params).start()
- loader.start()
+def choose_node_to_kill(servers, nodes_to_kill, sequential):
+ """Choose node to kill during test"""
+ assert nodes_to_kill > 0, "No nodes to kill passed. Check the parameters."
-def failed_pattern(failed_node_id):
- """
- Failed node pattern in log
- """
- return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
+ idx = random.randint(0, len(servers.nodes)-1)
+ to_kill = servers.nodes[idx:] + servers.nodes[:idx-1]
-def choose_node_to_kill(servers, kill_coordinator, nodes_to_kill):
- """Choose node to kill during test"""
- assert nodes_to_kill > 0, " No nodes to kill passed. Check the
parameters."
+ if not sequential:
+ to_kill = to_kill[0::2]
- nodes = servers.nodes
- coordinator = nodes[0].discovery_info().coordinator
- to_kill = []
+ idx = random.randint(0, len(to_kill) - nodes_to_kill)
+ to_kill = to_kill[idx:idx + nodes_to_kill]
- if kill_coordinator:
- to_kill.append(next(node for node in nodes if
node.discovery_info().node_id == coordinator))
- nodes_to_kill -= 1
+ survive = random.choice([node for node in servers.nodes if node not in
to_kill])
- if nodes_to_kill > 0:
- choice = random.sample([n for n in nodes if n.discovery_info().node_id
!= coordinator], nodes_to_kill)
- to_kill.extend([choice] if not isinstance(choice, list) else choice)
+ assert len(to_kill) == nodes_to_kill, "Unable to pick up required number
of nodes to kill."
- survive = random.choice([node for node in servers.nodes if node not in
to_kill])
+ assert survive, "Unable to select survived node to monitor the cluster on
it."
return to_kill, survive
-def simulate_nodes_failure(servers, failed_nodes, survived_node):
- """
- Perform node failure scenario
- """
- ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
+def order(node):
+ """Return discovery order of the node."""
+ return node.discovery_info().order
+
- _, first_terminated = servers.stop_nodes_async(failed_nodes,
clean_shutdown=False, wait_for_stop=False)
+def node_id(node):
+ """Return node id."""
+ return node.discovery_info().node_id
- # Keeps dates of logged node failures.
- logged_timestamps = []
- data = {}
- for failed_id in ids_to_wait:
- servers.await_event_on_node(failed_pattern(failed_id), survived_node,
20,
- from_the_beginning=True, backoff_sec=0.1)
+def node_fail_task(ignite_config, test_config):
+ """
+ Creates proper task to simulate network failure depending on the
configurations.
+ """
+ cm_spi = ignite_config.communication_spi
+ dsc_spi = ignite_config.discovery_spi
- _, stdout, _ = survived_node.account.ssh_client.exec_command(
- "grep '%s' %s" % (failed_pattern(failed_id),
IgniteAwareService.STDOUT_STDERR_CAPTURE))
+ cm_ports = str(cm_spi.port) if cm_spi.port_range < 1 else str(cm_spi.port)
+ ':' + str(
+ cm_spi.port + cm_spi.port_range)
- logged_timestamps.append(
- datetime.strptime(re.match("^\\[[^\\[]+\\]",
stdout.read().decode("utf-8")).group(),
- "[%Y-%m-%d %H:%M:%S,%f]"))
+ if test_config.with_zk:
+ dsc_ports = str(ignite_config.discovery_spi.port)
+ else:
+ dsc_ports = str(dsc_spi.port) if dsc_spi.port_range < 1 else
str(dsc_spi.port) + ':' + str(
+ dsc_spi.port + dsc_spi.port_range)
- logged_timestamps.sort(reverse=True)
+ cmd = f"sudo iptables -I %s 1 -p tcp -m multiport --dport
{dsc_ports},{cm_ports} -j DROP"
- first_kill_time = epoch_mills(first_terminated)
- detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+ return lambda node: (node.account.ssh_client.exec_command(cmd % "INPUT"),
+ node.account.ssh_client.exec_command(cmd % "OUTPUT"))
- data['Detection of node(s) failure (ms)'] = detection_delay
- data['All detection delays (ms):'] = str([epoch_mills(ts) -
first_kill_time for ts in logged_timestamps])
- data['Nodes failed'] = len(failed_nodes)
- return data
+def dump_netfilter_settings(node):
+ """
+ Reads current netfilter settings on the node for debugging purposes.
+ """
+ return str(node.account.ssh_client.exec_command("sudo iptables -L
-n")[1].read(), sys.getdefaultencoding())
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index feb5993..3b0323d 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -16,6 +16,9 @@
"""
This module contains basic ignite test.
"""
+import os
+import random
+import string
from time import monotonic
from ducktape.tests.test import Test
@@ -29,6 +32,29 @@ class IgniteTest(Test):
def __init__(self, test_context):
super().__init__(test_context=test_context)
+ self.tmp_path_root = None
+
+ def setup(self):
+ super().setup()
+
+ self.tmp_path_root = os.path.join("/tmp",
''.join(random.choices(string.ascii_letters + string.digits, k=10)),
+ self.test_context.cls_name)
+
+ self.clear_tmp_dir(True)
+
+ def teardown(self):
+ self.clear_tmp_dir()
+
+ super().teardown()
+
+ def clear_tmp_dir(self, recreate=False):
+ """Creates temporary directory for current test."""
+ for node in self.test_context.cluster.nodes:
+ node.account.ssh_client.exec_command("rm -drf " +
self.tmp_path_root)
+
+ if recreate:
+ node.account.ssh_client.exec_command("mkdir -p " +
self.tmp_path_root)
+
@staticmethod
def monotonic():
"""