This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 8e9350e  Ducktests iptables (#8211)
8e9350e is described below

commit 8e9350e676ac7ab9a631cba12c28ff69ff2ed281
Author: Vladsz83 <[email protected]>
AuthorDate: Wed Oct 21 10:09:55 2020 +0300

    Ducktests iptables (#8211)
---
 .../ducktests/tests/ignitetest/services/ignite.py  |  58 +++--
 .../ignitetest/services/utils/ignite_aware.py      |   3 +-
 .../utils/ignite_configuration/__init__.py         |   3 +
 .../utils/ignite_configuration/communication.py}   |  41 +--
 .../utils/ignite_configuration/discovery.py        |  11 +-
 .../templates/communication_macro.j2}              |  16 +-
 .../services/utils/templates/discovery_macro.j2    |  32 ++-
 .../services/utils/templates/ignite.xml.j2         |   4 +
 .../services/zk/templates/zookeeper.properties.j2  |   1 +
 .../tests/ignitetest/services/zk/zookeeper.py      |  13 +-
 .../tests/ignitetest/tests/discovery_test.py       | 287 +++++++++++++++------
 .../tests/ignitetest/utils/ignite_test.py          |  26 ++
 12 files changed, 349 insertions(+), 146 deletions(-)

diff --git a/modules/ducktests/tests/ignitetest/services/ignite.py 
b/modules/ducktests/tests/ignitetest/services/ignite.py
index f50365d..ad88db2 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite.py
@@ -17,9 +17,8 @@
 This module contains class to start ignite cluster node.
 """
 
-import functools
-import operator
 import os
+import re
 import signal
 import time
 from datetime import datetime
@@ -70,7 +69,7 @@ class IgniteService(IgniteAwareService):
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
         for pid in pids:
-            self.__stop_node(node, pid, sig)
+            node.account.signal(pid, sig, allow_fail=False)
 
         try:
             wait_until(lambda: len(self.pids(node)) == 0, 
timeout_sec=timeout_sec,
@@ -79,21 +78,22 @@ class IgniteService(IgniteAwareService):
             self.thread_dump(node)
             raise
 
-    def stop_nodes_async(self, nodes, delay_ms=0, clean_shutdown=True, 
timeout_sec=20, wait_for_stop=False):
+    def exec_on_nodes_async(self, nodes, task, simultaneously=True, 
delay_ms=0, timeout_sec=20):
         """
-        Stops the nodes asynchronously.
+        Executes given task on the nodes.
+        :param task: a 'lambda: node'.
+        :param simultaneously: Enables or disables simultaneous start of the 
task on each node.
+        :param delay_ms: delay before task run. Begins with 0, grows by 
delay_ms for each next node in nodes.
+        :param timeout_sec: timeout to wait the task.
         """
-        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
-
-        sem = CountDownLatch(len(nodes))
+        sem = CountDownLatch(len(nodes)) if simultaneously else None
         time_holder = AtomicValue()
 
         delay = 0
         threads = []
 
         for node in nodes:
-            thread = Thread(target=self.__stop_node,
-                            args=(node, next(iter(self.pids(node))), sig, sem, 
delay, time_holder))
+            thread = Thread(target=self.__exec_on_node, args=(node, task, sem, 
delay, time_holder))
 
             threads.append(thread)
 
@@ -104,19 +104,10 @@ class IgniteService(IgniteAwareService):
         for thread in threads:
             thread.join(timeout_sec)
 
-        if wait_for_stop:
-            try:
-                wait_until(lambda: len(functools.reduce(operator.iconcat, 
(self.pids(n) for n in nodes), [])) == 0,
-                           timeout_sec=timeout_sec, err_msg="Ignite node 
failed to stop in %d seconds" % timeout_sec)
-            except Exception:
-                for node in nodes:
-                    self.thread_dump(node)
-                raise
-
         return time_holder.get()
 
     @staticmethod
-    def __stop_node(node, pid, sig, start_waiter=None, delay_ms=0, 
time_holder=None):
+    def __exec_on_node(node, task, start_waiter=None, delay_ms=0, 
time_holder=None):
         if start_waiter:
             start_waiter.count_down()
             start_waiter.wait()
@@ -130,7 +121,7 @@ class IgniteService(IgniteAwareService):
 
             time_holder.compare_and_set(None, (mono, timestamp))
 
-        node.account.signal(pid, sig, False)
+        task(node)
 
     def clean_node(self, node):
         node.account.kill_java_processes(self.APP_SERVICE_CLASS, 
clean_shutdown=False, allow_fail=True)
@@ -154,3 +145,28 @@ class IgniteService(IgniteAwareService):
             return pid_arr
         except (RemoteCommandError, ValueError):
             return []
+
+
+def node_failed_event_pattern(failed_node_id=None):
+    """Failed node pattern in log."""
+    return "Node FAILED: .\\{1,\\}Node \\[id=" + (failed_node_id if 
failed_node_id else "") + \
+           ".\\{1,\\}\\(isClient\\|client\\)=false"
+
+
+def get_event_time(service, log_node, log_pattern, from_the_beginning=True, 
timeout=15):
+    """
+    Extracts event time from ignite log by pattern .
+    :param service: ducktape service (ignite service) responsible to search 
log.
+    :param log_node: ducktape node to search ignite log on.
+    :param log_pattern: pattern to search ignite log for.
+    :param from_the_beginning: switches searching log from its beginning.
+    :param timeout: timeout to wait for the patters in the log.
+    """
+    service.await_event_on_node(log_pattern, log_node, timeout, 
from_the_beginning=from_the_beginning,
+                                backoff_sec=0.3)
+
+    _, stdout, _ = log_node.account.ssh_client.exec_command(
+        "grep '%s' %s" % (log_pattern, 
IgniteAwareService.STDOUT_STDERR_CAPTURE))
+
+    return datetime.strptime(re.match("^\\[[^\\[]+\\]", 
stdout.read().decode("utf-8")).group(),
+                             "[%Y-%m-%d %H:%M:%S,%f]")
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index fadbbe6..c964b69 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -122,7 +122,8 @@ class IgniteAwareService(BackgroundThreadService, 
IgnitePersistenceAware, metacl
                 monitor.offset = 0
 
             monitor.wait_until(evt_message, timeout_sec=timeout_sec, 
backoff_sec=backoff_sec,
-                               err_msg="Event [%s] was not triggered in %d 
seconds" % (evt_message, timeout_sec))
+                               err_msg="Event [%s] was not triggered on '%s' 
in %d seconds" % (evt_message, node.name,
+                                                                               
                timeout_sec))
 
     def await_event(self, evt_message, timeout_sec, from_the_beginning=False, 
backoff_sec=5):
         """
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index b9602e4..5d86a22 100644
--- 
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -19,6 +19,7 @@ This module contains IgniteConfiguration classes and 
utilities.
 
 from typing import NamedTuple
 
+from ignitetest.services.utils.ignite_configuration.communication import 
CommunicationSpi, TcpCommunicationSpi
 from ignitetest.services.utils.ignite_configuration.data_storage import 
DataStorageConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import 
DiscoverySpi, TcpDiscoverySpi
 from ignitetest.utils.version import IgniteVersion, DEV_BRANCH
@@ -29,11 +30,13 @@ class IgniteConfiguration(NamedTuple):
     Ignite configuration.
     """
     discovery_spi: DiscoverySpi = TcpDiscoverySpi()
+    communication_spi: CommunicationSpi = TcpCommunicationSpi()
     version: IgniteVersion = DEV_BRANCH
     cluster_state: str = 'ACTIVE'
     client_mode: bool = False
     consistent_id: str = None
     failure_detection_timeout: int = 10000
+    sys_worker_blocked_timeout: int = 10000
     properties: str = None
     data_storage: DataStorageConfiguration = None
     caches: list = []
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
similarity index 52%
copy from modules/ducktests/tests/ignitetest/utils/ignite_test.py
copy to 
modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
index feb5993..edca32c 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/communication.py
@@ -11,32 +11,35 @@
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License
 
 """
-This module contains basic ignite test.
+Module contains classes and utility methods to create communication 
configuration for ignite nodes.
 """
-from time import monotonic
 
-from ducktape.tests.test import Test
+from abc import ABCMeta, abstractmethod
 
 
-# pylint: disable=W0223
-class IgniteTest(Test):
+class CommunicationSpi(metaclass=ABCMeta):
     """
-    Basic ignite test.
+    Abstract class for CommunicationSpi.
     """
-    def __init__(self, test_context):
-        super().__init__(test_context=test_context)
-
-    @staticmethod
-    def monotonic():
+    @property
+    @abstractmethod
+    def type(self):
         """
-        monotonic() -> float
-
-        :return:
-            The value (in fractional seconds) of a monotonic clock, i.e. a 
clock that cannot go backwards.
-            The clock is not affected by system clock updates. The reference 
point of the returned value is undefined,
-            so that only the difference between the results of consecutive 
calls is valid.
+        Type of CommunicationSpi.
         """
-        return monotonic()
+
+
+class TcpCommunicationSpi(CommunicationSpi):
+    """
+    TcpCommunicationSpi.
+    """
+    def __init__(self, port=47100, port_range=100):
+        self.port = port
+        self.port_range = port_range
+
+    @property
+    def type(self):
+        return "TCP"
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
index 9f38da5..2f53361 100644
--- 
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
@@ -45,8 +45,9 @@ class ZookeeperDiscoverySpi(DiscoverySpi):
     """
     ZookeeperDiscoverySpi.
     """
-    def __init__(self, connection_string, root_path):
-        self.connection_string = connection_string
+    def __init__(self, zoo_service, root_path):
+        self.connection_string = zoo_service.connection_string()
+        self.port = zoo_service.settings.client_port
         self.root_path = root_path
 
     @property
@@ -100,8 +101,10 @@ class TcpDiscoverySpi(DiscoverySpi):
     """
     TcpDiscoverySpi.
     """
-    def __init__(self, ip_finder=TcpDiscoveryVmIpFinder()):
+    def __init__(self, ip_finder=TcpDiscoveryVmIpFinder(), port=47500, 
port_range=100):
         self.ip_finder = ip_finder
+        self.port = port
+        self.port_range = port_range
 
     @property
     def type(self):
@@ -138,4 +141,4 @@ def from_zookeeper_cluster(cluster, 
root_path="/apacheIgnite"):
     """
     assert isinstance(cluster, ZookeeperService)
 
-    return ZookeeperDiscoverySpi(cluster.connection_string(), 
root_path=root_path)
+    return ZookeeperDiscoverySpi(cluster, root_path=root_path)
diff --git 
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
similarity index 69%
copy from 
modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
copy to 
modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
index 2c80e84..1f5c17c 100644
--- 
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/templates/communication_macro.j2
@@ -15,11 +15,11 @@
  limitations under the License.
 #}
 
-tickTime={{ settings.tick_time }}
-initLimit={{ settings.init_limit }}
-syncLimit={{ settings.sync_limit }}
-dataDir={{ DATA_DIR }}
-clientPort={{ settings.client_port }}
-{% for node in nodes %}
-server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
-{% endfor %}
+{% macro communication_spi(spi) %}
+    <property name="communicationSpi">
+        <bean 
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
+            <property name="localPort" value="{{ spi.port }}"/>
+            <property name="localPortRange" value="{{ spi.port_range }}"/>
+        </bean>
+    </property>
+{% endmacro %}
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
 
b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
index fdd632a..66bdf43 100644
--- 
a/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
+++ 
b/modules/ducktests/tests/ignitetest/services/utils/templates/discovery_macro.j2
@@ -15,20 +15,22 @@
  limitations under the License.
 #}
 
-{% macro ip_finder(finder) %}
-    {% if finder %}
+{% macro ip_finder(spi) %}
+    {% if spi.ip_finder and spi.ip_finder.type == 'VM' %}
         <property name="ipFinder">
-            {% if finder.type == 'VM' %}
-                <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
-                    <property name="addresses">
-                        <list>
-                            {% for address in finder.addresses %}
-                                <value>{{ address }}</value>
-                            {% endfor %}
-                        </list>
-                    </property>
-                </bean>
-            {% endif %}
+            <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                <property name="addresses">
+                    <list>
+                        {% for address in spi.ip_finder.addresses %}
+                            {% if spi.port_range > 0 %}
+                                <value>{{ address }}:{{ spi.port }}..{{ 
spi.port + spi.port_range }}</value>
+                            {% else %}
+                                <value>{{ address }}:{{ spi.port }}</value>
+                            {% endif %}
+                        {% endfor %}
+                    </list>
+                </property>
+            </bean>
         </property>
     {% endif %}
 {% endmacro %}
@@ -42,7 +44,9 @@
 
 {% macro tcp_discovery_spi(spi) %}
     <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
-        {{ ip_finder(spi.ip_finder) }}
+        <property name="localPort" value="{{ spi.port }}"/>
+        <property name="localPortRange" value="{{ spi.port_range }}"/>
+        {{ ip_finder(spi) }}
     </bean>
 {% endmacro %}
 
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 
b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index 86abd35..798897a 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -17,6 +17,7 @@
  limitations under the License.
 #}
 
+{% import 'communication_macro.j2' as communication %}
 {% import 'discovery_macro.j2' as disco_utils %}
 {% import 'cache_macro.j2' as cache_utils %}
 {% import 'datastorage_macro.j2' as datastorage_utils %}
@@ -37,9 +38,12 @@
         <property name="clientMode" value="{{ config.client_mode or False | 
lower }}"/>
         <property name="consistentId" value="{{ config.consistent_id }}"/>
         <property name="failureDetectionTimeout" value="{{ 
config.failure_detection_timeout }}"/>
+        <property name="systemWorkerBlockedTimeout" value="{{ 
config.sys_worker_blocked_timeout }}"/>
 
         {{ misc_utils.cluster_state(config.cluster_state, config.version) }}
 
+        {{ communication.communication_spi(config.communication_spi) }}
+
         {{ disco_utils.discovery_spi(config.discovery_spi) }}
 
         {{ datastorage_utils.data_storage(config.data_storage) }}
diff --git 
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
 
b/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
index 2c80e84..441b0db 100644
--- 
a/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
+++ 
b/modules/ducktests/tests/ignitetest/services/zk/templates/zookeeper.properties.j2
@@ -16,6 +16,7 @@
 #}
 
 tickTime={{ settings.tick_time }}
+minSessionTimeout={{ settings.min_session_timeout }}
 initLimit={{ settings.init_limit }}
 syncLimit={{ settings.sync_limit }}
 dataDir={{ DATA_DIR }}
diff --git a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py 
b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
index b3da642..ca8a7fb 100644
--- a/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
+++ b/modules/ducktests/tests/ignitetest/services/zk/zookeeper.py
@@ -27,11 +27,14 @@ class ZookeeperSettings:
     """
     Settings for zookeeper quorum nodes.
     """
-    def __init__(self, tick_time=1000, init_limit=10, sync_limit=5, 
client_port=2181):
-        self.tick_time = tick_time
-        self.init_limit = init_limit
-        self.sync_limit = sync_limit
-        self.client_port = client_port
+    def __init__(self, **kwargs):
+        self.tick_time = kwargs.get('tick_time', 1000)
+        self.min_session_timeout = kwargs.get('min_session_timeout', 2000)
+        self.init_limit = kwargs.get('init_limit', 10)
+        self.sync_limit = kwargs.get('sync_limit', 5)
+        self.client_port = kwargs.get('client_port', 2181)
+
+        assert self.tick_time <= self.min_session_timeout // 2, "'tick_time' 
must be <= 'min_session_timeout' / 2"
 
 
 class ZookeeperService(Service):
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py 
b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 2c8d6a1..f2be35b 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -17,25 +17,24 @@
 Module contains discovery tests.
 """
 
+import os
 import random
-import re
+import sys
 from enum import IntEnum
-from datetime import datetime
 from time import monotonic
 from typing import NamedTuple
 
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
-from ignitetest.services.ignite import IgniteAwareService
-from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite import IgniteAwareService, IgniteService, 
get_event_time, node_failed_event_pattern
 from ignitetest.services.ignite_app import IgniteApplicationService
 from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
 from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_zookeeper_cluster, from_ignite_cluster, \
     TcpDiscoverySpi
 from ignitetest.services.utils.time_utils import epoch_mills
-from ignitetest.services.zk.zookeeper import ZookeeperService
+from ignitetest.services.zk.zookeeper import ZookeeperService, 
ZookeeperSettings
 from ignitetest.utils import ignite_versions, version_if
 from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, V_2_8_0, 
IgniteVersion
@@ -56,8 +55,8 @@ class DiscoveryTestConfig(NamedTuple):
     """
     version: IgniteVersion
     nodes_to_kill: int = 1
-    kill_coordinator: bool = False
     load_type: ClusterLoad = ClusterLoad.NONE
+    sequential_failure: bool = False
     with_zk: bool = False
 
 
@@ -69,46 +68,76 @@ class DiscoveryTest(IgniteTest):
     2. Kill random node.
     3. Wait that survived node detects node failure.
     """
-    NUM_NODES = 7
+    NUM_NODES = 9
 
-    FAILURE_DETECTION_TIMEOUT = 2000
+    FAILURE_DETECTION_TIMEOUT_TCP = 1000
+
+    FAILURE_DETECTION_TIMEOUT_ZK = 3000
 
     DATA_AMOUNT = 5_000_000
 
     WARMUP_DATA_AMOUNT = 10_000
 
+    def __init__(self, test_context):
+        super().__init__(test_context=test_context)
+
+        self.netfilter_store_path = None
+
     @cluster(num_nodes=NUM_NODES)
     @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
-    @matrix(kill_coordinator=[False, True],
-            nodes_to_kill=[1, 2],
+    @matrix(nodes_to_kill=[1, 2],
             load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC, 
ClusterLoad.TRANSACTIONAL])
-    def test_node_fail_tcp(self, ignite_version, kill_coordinator, 
nodes_to_kill, load_type):
+    def test_nodes_fail_not_sequential_tcp(self, ignite_version, 
nodes_to_kill, load_type):
+        """
+        Test nodes failure scenario with TcpDiscoverySpi not allowing nodes to 
fail in a row.
+        """
+        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), 
nodes_to_kill=nodes_to_kill,
+                                          load_type=load_type, 
sequential_failure=False)
+
+        return self._perform_node_fail_scenario(test_config)
+
+    @cluster(num_nodes=NUM_NODES)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    @matrix(load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC, 
ClusterLoad.TRANSACTIONAL])
+    def test_2_nodes_fail_sequential_tcp(self, ignite_version, load_type):
         """
-        Test nodes failure scenario with TcpDiscoverySpi.
-        :param load_type: How to load cluster during the test: 0 - no loading; 
1 - do some loading; 2 - transactional.
+        Test 2 nodes sequential failure scenario with TcpDiscoverySpi.
         """
-        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), 
kill_coordinator=kill_coordinator,
-                                          nodes_to_kill=nodes_to_kill, 
load_type=load_type, with_zk=False)
+        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2, 
load_type=load_type,
+                                          sequential_failure=True)
 
         return self._perform_node_fail_scenario(test_config)
 
     @cluster(num_nodes=NUM_NODES + 3)
     @version_if(lambda version: version != V_2_8_0)  # ignite-zookeeper 
package is broken in 2.8.0
     @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
-    @matrix(kill_coordinator=[False, True],
-            nodes_to_kill=[1, 2],
+    @matrix(nodes_to_kill=[1, 2],
             load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC, 
ClusterLoad.TRANSACTIONAL])
-    def test_node_fail_zk(self, ignite_version, kill_coordinator, 
nodes_to_kill, load_type):
+    def test_nodes_fail_not_sequential_zk(self, ignite_version, nodes_to_kill, 
load_type):
+        """
+        Test node failure scenario with ZooKeeperSpi not allowing nodes to 
fail in a row.
+        """
+        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), 
nodes_to_kill=nodes_to_kill,
+                                          load_type=load_type, 
sequential_failure=False, with_zk=True)
+
+        return self._perform_node_fail_scenario(test_config)
+
+    @cluster(num_nodes=NUM_NODES + 3)
+    @version_if(lambda version: version != V_2_8_0)  # ignite-zookeeper 
package is broken in 2.8.0
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    @matrix(load_type=[ClusterLoad.NONE, ClusterLoad.ATOMIC, 
ClusterLoad.TRANSACTIONAL])
+    def test_2_nodes_fail_sequential_zk(self, ignite_version, load_type):
         """
-        Test node failure scenario with ZooKeeperSpi.
-        :param load_type: How to load cluster during the test: 0 - no loading; 
1 - do some loading; 2 - transactional.
+        Test node failure scenario with ZooKeeperSpi not allowing to fail 
nodes in a row.
         """
-        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), 
kill_coordinator=kill_coordinator,
-                                          nodes_to_kill=nodes_to_kill, 
load_type=load_type, with_zk=True)
+        test_config = 
DiscoveryTestConfig(version=IgniteVersion(ignite_version), nodes_to_kill=2, 
load_type=load_type,
+                                          sequential_failure=True, 
with_zk=True)
 
         return self._perform_node_fail_scenario(test_config)
 
     def _perform_node_fail_scenario(self, test_config):
+        results = {}
+
         modules = ['zookeeper'] if test_config.with_zk else None
 
         if test_config.with_zk:
@@ -121,7 +150,8 @@ class DiscoveryTest(IgniteTest):
         ignite_config = IgniteConfiguration(
             version=test_config.version,
             discovery_spi=discovery_spi,
-            failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT,
+            failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT_ZK if 
test_config.with_zk
+            else self.FAILURE_DETECTION_TIMEOUT_TCP,
             caches=[CacheConfiguration(
                 name='test-cache',
                 backups=1,
@@ -131,14 +161,16 @@ class DiscoveryTest(IgniteTest):
 
         servers, start_servers_sec = start_servers(self.test_context, 
self.NUM_NODES - 1, ignite_config, modules)
 
-        failed_nodes, survived_node = choose_node_to_kill(servers, 
test_config.kill_coordinator,
-                                                          
test_config.nodes_to_kill)
+        results['Ignite cluster start time (s)'] = start_servers_sec
+
+        failed_nodes, survived_node = choose_node_to_kill(servers, 
test_config.nodes_to_kill,
+                                                          
test_config.sequential_failure)
 
         if test_config.load_type is not ClusterLoad.NONE:
             load_config = ignite_config._replace(client_mode=True) if 
test_config.with_zk else \
                 ignite_config._replace(client_mode=True, 
discovery_spi=from_ignite_cluster(servers))
 
-            tran_nodes = [n.discovery_info().node_id for n in failed_nodes] \
+            tran_nodes = [node_id(n) for n in failed_nodes] \
                 if test_config.load_type == ClusterLoad.TRANSACTIONAL else None
 
             params = {"cacheName": "test-cache",
@@ -149,18 +181,128 @@ class DiscoveryTest(IgniteTest):
 
             start_load_app(self.test_context, ignite_config=load_config, 
params=params, modules=modules)
 
-        data = simulate_nodes_failure(servers, failed_nodes, survived_node)
+        results.update(self._simulate_nodes_failure(servers, 
node_fail_task(ignite_config, test_config), failed_nodes,
+                                                    survived_node))
+
+        return results
+
+    def _simulate_nodes_failure(self, servers, kill_node_task, failed_nodes, 
survived_node):
+        """
+        Perform node failure scenario
+        """
+        for node in failed_nodes:
+            self.logger.info(
+                "Simulating failure of node '%s' (order %d) on '%s'" % 
(node_id(node), order(node), node.name))
+
+        ids_to_wait = [node_id(n) for n in failed_nodes]
+
+        _, first_terminated = servers.exec_on_nodes_async(failed_nodes, 
kill_node_task)
+
+        for node in failed_nodes:
+            self.logger.debug(
+                "Netfilter activated on '%s': %s" % (node.name, 
dump_netfilter_settings(node)))
 
-        data['Ignite cluster start time (s)'] = start_servers_sec
+        # Keeps dates of logged node failures.
+        logged_timestamps = []
+        data = {}
+
+        for failed_id in ids_to_wait:
+            logged_timestamps.append(
+                get_event_time(servers, survived_node, 
node_failed_event_pattern(failed_id)))
+
+        self._check_failed_number(failed_nodes, survived_node)
+        self._check_not_segmented(failed_nodes)
+
+        logged_timestamps.sort(reverse=True)
+
+        first_kill_time = epoch_mills(first_terminated)
+        detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+
+        data['Detection of node(s) failure (ms)'] = detection_delay
+        data['All detection delays (ms):'] = str([epoch_mills(ts) - 
first_kill_time for ts in logged_timestamps])
+        data['Nodes failed'] = len(failed_nodes)
 
         return data
 
+    def _check_failed_number(self, failed_nodes, survived_node):
+        """Ensures number of failed nodes is correct."""
+        cmd = "grep '%s' %s | wc -l" % (node_failed_event_pattern(), 
IgniteAwareService.STDOUT_STDERR_CAPTURE)
+
+        failed_cnt = 
int(str(survived_node.account.ssh_client.exec_command(cmd)[1].read(), 
sys.getdefaultencoding()))
+
+        if failed_cnt != len(failed_nodes):
+            failed = str(survived_node.account.ssh_client.exec_command(
+                "grep '%s' %s" % (node_failed_event_pattern(), 
IgniteAwareService.STDOUT_STDERR_CAPTURE))[1].read(),
+                         sys.getdefaultencoding())
+
+            self.logger.warn("Node '%s' (%s) has detected the following 
failures:%s%s" % (
+                survived_node.name, node_id(survived_node), os.linesep, 
failed))
+
+            raise AssertionError(
+                "Wrong number of failed nodes: %d. Expected: %d. Check the 
logs." % (failed_cnt, len(failed_nodes)))
+
+    def _check_not_segmented(self, failed_nodes):
+        """Ensures only target nodes failed"""
+        for service in [srv for srv in self.test_context.services if 
isinstance(srv, IgniteAwareService)]:
+            for node in [srv_node for srv_node in service.nodes if srv_node 
not in failed_nodes]:
+                cmd = "grep -i '%s' %s | wc -l" % ("local node segmented", 
IgniteAwareService.STDOUT_STDERR_CAPTURE)
+
+                failed = 
str(node.account.ssh_client.exec_command(cmd)[1].read(), 
sys.getdefaultencoding())
+
+                if int(failed) > 0:
+                    raise AssertionError(
+                        "Wrong node failed (segmented) on '%s'. Check the 
logs." % node.name)
+
+    def setup(self):
+        super().setup()
+
+        self.netfilter_store_path = os.path.join(self.tmp_path_root, 
"iptables.bak")
+
+        # Store current network filter settings.
+        for node in self.test_context.cluster.nodes:
+            cmd = "sudo iptables-save | tee " + self.netfilter_store_path
+
+            exec_error = 
str(node.account.ssh_client.exec_command(cmd)[2].read(), 
sys.getdefaultencoding())
+
+            if "Warning: iptables-legacy tables present" in exec_error:
+                cmd = "sudo iptables-legacy-save | tee " + 
self.netfilter_store_path
+
+                exec_error = 
str(node.account.ssh_client.exec_command(cmd)[2].read(), 
sys.getdefaultencoding())
+
+            assert len(exec_error) == 0, "Failed to store iptables rules on 
'%s': %s" % (node.name, exec_error)
+
+            self.logger.debug("Netfilter before launch on '%s': %s" % 
(node.name, dump_netfilter_settings(node)))
+
+    def teardown(self):
+        # Restore previous network filter settings.
+        cmd = "sudo iptables-restore < " + self.netfilter_store_path
+
+        errors = []
+
+        for node in self.test_context.cluster.nodes:
+            exec_error = 
str(node.account.ssh_client.exec_command(cmd)[2].read(), 
sys.getdefaultencoding())
+
+            if len(exec_error) > 0:
+                errors.append("Failed to restore iptables rules on '%s': %s" % 
(node.name, exec_error))
+            else:
+                self.logger.debug("Netfilter after launch on '%s': %s" % 
(node.name, dump_netfilter_settings(node)))
+
+        if len(errors) > 0:
+            self.logger.error("Failed restoring actions:" + os.linesep + 
os.linesep.join(errors))
+
+            raise RuntimeError("Unable to restore node states. See the log 
above.")
+
+        super().teardown()
+
 
 def start_zookeeper(test_context, num_nodes):
     """
     Start zookeeper cluster.
     """
-    zk_quorum = ZookeeperService(test_context, num_nodes)
+    zk_settings = 
ZookeeperSettings(min_session_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT_ZK,
+                                    
tick_time=DiscoveryTest.FAILURE_DETECTION_TIMEOUT_ZK // 3)
+
+    zk_quorum = ZookeeperService(test_context, num_nodes, settings=zk_settings)
     zk_quorum.start()
     return zk_quorum
 
@@ -182,76 +324,73 @@ def start_load_app(test_context, ignite_config, params, 
modules=None):
     """
     Start loader application.
     """
-    loader = IgniteApplicationService(
+    IgniteApplicationService(
         test_context,
         config=ignite_config,
         
java_class_name="org.apache.ignite.internal.ducktest.tests.ContinuousDataLoadApplication",
         modules=modules,
         # mute spam in log.
         jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"],
-        params=params)
+        params=params).start()
 
-    loader.start()
 
+def choose_node_to_kill(servers, nodes_to_kill, sequential):
+    """Choose node to kill during test"""
+    assert nodes_to_kill > 0, "No nodes to kill passed. Check the parameters."
 
-def failed_pattern(failed_node_id):
-    """
-    Failed node pattern in log
-    """
-    return "Node FAILED: .\\{1,\\}Node \\[id=" + failed_node_id
+    idx = random.randint(0, len(servers.nodes)-1)
 
+    to_kill = servers.nodes[idx:] + servers.nodes[:idx-1]
 
-def choose_node_to_kill(servers, kill_coordinator, nodes_to_kill):
-    """Choose node to kill during test"""
-    assert nodes_to_kill > 0, "   No nodes to kill passed. Check the 
parameters."
+    if not sequential:
+        to_kill = to_kill[0::2]
 
-    nodes = servers.nodes
-    coordinator = nodes[0].discovery_info().coordinator
-    to_kill = []
+    idx = random.randint(0, len(to_kill) - nodes_to_kill)
+    to_kill = to_kill[idx:idx + nodes_to_kill]
 
-    if kill_coordinator:
-        to_kill.append(next(node for node in nodes if 
node.discovery_info().node_id == coordinator))
-        nodes_to_kill -= 1
+    survive = random.choice([node for node in servers.nodes if node not in 
to_kill])
 
-    if nodes_to_kill > 0:
-        choice = random.sample([n for n in nodes if n.discovery_info().node_id 
!= coordinator], nodes_to_kill)
-        to_kill.extend([choice] if not isinstance(choice, list) else choice)
+    assert len(to_kill) == nodes_to_kill, "Unable to pick up required number 
of nodes to kill."
 
-    survive = random.choice([node for node in servers.nodes if node not in 
to_kill])
+    assert survive, "Unable to select survived node to monitor the cluster on 
it."
 
     return to_kill, survive
 
 
-def simulate_nodes_failure(servers, failed_nodes, survived_node):
-    """
-    Perform node failure scenario
-    """
-    ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
+def order(node):
+    """Return discovery order of the node."""
+    return node.discovery_info().order
+
 
-    _, first_terminated = servers.stop_nodes_async(failed_nodes, 
clean_shutdown=False, wait_for_stop=False)
+def node_id(node):
+    """Return node id."""
+    return node.discovery_info().node_id
 
-    # Keeps dates of logged node failures.
-    logged_timestamps = []
-    data = {}
 
-    for failed_id in ids_to_wait:
-        servers.await_event_on_node(failed_pattern(failed_id), survived_node, 
20,
-                                    from_the_beginning=True, backoff_sec=0.1)
+def node_fail_task(ignite_config, test_config):
+    """
+    Creates proper task to simulate network failure depending on the 
configurations.
+    """
+    cm_spi = ignite_config.communication_spi
+    dsc_spi = ignite_config.discovery_spi
 
-        _, stdout, _ = survived_node.account.ssh_client.exec_command(
-            "grep '%s' %s" % (failed_pattern(failed_id), 
IgniteAwareService.STDOUT_STDERR_CAPTURE))
+    cm_ports = str(cm_spi.port) if cm_spi.port_range < 1 else str(cm_spi.port) 
+ ':' + str(
+        cm_spi.port + cm_spi.port_range)
 
-        logged_timestamps.append(
-            datetime.strptime(re.match("^\\[[^\\[]+\\]", 
stdout.read().decode("utf-8")).group(),
-                              "[%Y-%m-%d %H:%M:%S,%f]"))
+    if test_config.with_zk:
+        dsc_ports = str(ignite_config.discovery_spi.port)
+    else:
+        dsc_ports = str(dsc_spi.port) if dsc_spi.port_range < 1 else 
str(dsc_spi.port) + ':' + str(
+            dsc_spi.port + dsc_spi.port_range)
 
-    logged_timestamps.sort(reverse=True)
+    cmd = f"sudo iptables -I %s 1 -p tcp -m multiport --dport 
{dsc_ports},{cm_ports} -j DROP"
 
-    first_kill_time = epoch_mills(first_terminated)
-    detection_delay = epoch_mills(logged_timestamps[0]) - first_kill_time
+    return lambda node: (node.account.ssh_client.exec_command(cmd % "INPUT"),
+                         node.account.ssh_client.exec_command(cmd % "OUTPUT"))
 
-    data['Detection of node(s) failure (ms)'] = detection_delay
-    data['All detection delays (ms):'] = str([epoch_mills(ts) - 
first_kill_time for ts in logged_timestamps])
-    data['Nodes failed'] = len(failed_nodes)
 
-    return data
+def dump_netfilter_settings(node):
+    """
+    Reads current netfilter settings on the node for debugging purposes.
+    """
+    return str(node.account.ssh_client.exec_command("sudo iptables -L 
-n")[1].read(), sys.getdefaultencoding())
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py 
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index feb5993..3b0323d 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -16,6 +16,9 @@
 """
 This module contains basic ignite test.
 """
+import os
+import random
+import string
 from time import monotonic
 
 from ducktape.tests.test import Test
@@ -29,6 +32,29 @@ class IgniteTest(Test):
     def __init__(self, test_context):
         super().__init__(test_context=test_context)
 
+        self.tmp_path_root = None
+
+    def setup(self):
+        super().setup()
+
+        self.tmp_path_root = os.path.join("/tmp", 
''.join(random.choices(string.ascii_letters + string.digits, k=10)),
+                                          self.test_context.cls_name)
+
+        self.clear_tmp_dir(True)
+
+    def teardown(self):
+        self.clear_tmp_dir()
+
+        super().teardown()
+
+    def clear_tmp_dir(self, recreate=False):
+        """Creates temporary directory for current test."""
+        for node in self.test_context.cluster.nodes:
+            node.account.ssh_client.exec_command("rm -drf " + 
self.tmp_path_root)
+
+            if recreate:
+                node.account.ssh_client.exec_command("mkdir -p " + 
self.tmp_path_root)
+
     @staticmethod
     def monotonic():
         """

Reply via email to