This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c0d37c6fe5a IGNITE-26717 [ducktests] Add ducktest for Rolling Upgrade
(#12451)
c0d37c6fe5a is described below
commit c0d37c6fe5af7425c4418c2b5a6cd542c9178264
Author: Maksim Davydov <[email protected]>
AuthorDate: Tue Dec 23 14:06:05 2025 +0300
IGNITE-26717 [ducktests] Add ducktest for Rolling Upgrade (#12451)
Authored-by: Maksim Davydov <[email protected]>
---
.../DataLoaderAndCheckerApplication.java | 6 +-
.../ignitetest/services/utils/control_utility.py | 27 ++++
.../ignitetest/services/utils/ignite_aware.py | 24 ++--
.../utils/ignite_configuration/discovery.py | 25 ++++
.../ignitetest/tests/rolling_upgrade/__init__.py | 18 +++
.../add_remove_node_upgrade_test.py | 80 ++++++++++++
.../rolling_upgrade/in_place_node_upgrade_test.py | 64 ++++++++++
.../tests/ignitetest/tests/rolling_upgrade/util.py | 140 +++++++++++++++++++++
8 files changed, 376 insertions(+), 8 deletions(-)
diff --git
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/persistence_upgrade_test/DataLoaderAndCheckerApplication.java
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/persistence_upgrade_test/DataLoaderAndCheckerApplication.java
index 11d8e707a82..82108b3e5ef 100644
---
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/persistence_upgrade_test/DataLoaderAndCheckerApplication.java
+++
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/persistence_upgrade_test/DataLoaderAndCheckerApplication.java
@@ -34,17 +34,21 @@ public class DataLoaderAndCheckerApplication extends
IgniteAwareApplication {
/** {@inheritDoc} */
@Override public void run(JsonNode jNode) throws
IgniteInterruptedCheckedException {
boolean check = jNode.get("check").asBoolean();
+ int backups = jNode.path("backups").asInt(0);
+ int entryCnt = jNode.path("entryCount").asInt(10_000);
markInitialized();
waitForActivation();
CacheConfiguration<Integer, CustomObject> cacheCfg = new
CacheConfiguration<>("cache");
+ cacheCfg.setBackups(backups);
+
IgniteCache<Integer, CustomObject> cache =
ignite.getOrCreateCache(cacheCfg);
log.info(check ? "Checking..." : " Preparing...");
- for (int i = 0; i < 10_000; i++) {
+ for (int i = 0; i < entryCnt; i++) {
CustomObject obj = new CustomObject(i);
if (!check)
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
index d95f97b0a13..404744b5c76 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
@@ -231,6 +231,33 @@ class ControlUtility:
return res
+ def enable_rolling_upgrade(self, target_version: str, force: bool = False):
+ """
+ Enable Rolling Upgrade with the target Ignite version.
+ :param target_version: Target Ignite version.
+ :param force: If {@code true}, skips target version compatibility
checks and forcibly enables rolling upgrade.
+ This flag does not override an already active upgrade
configuration.
+ """
+
+ if force:
+ result = self.__run(f"--rolling-upgrade enable {target_version}
--force --enable-experimental --yes")
+ else:
+ result = self.__run(f"--rolling-upgrade enable {target_version}
--enable-experimental --yes")
+
+ assert "Rolling upgrade enabled" in result, f"Unexpected response:
{result}"
+
+ return result
+
+ def disable_rolling_upgrade(self):
+ """
+ Disable Rolling Upgrade.
+ """
+ result = self.__run(f"--rolling-upgrade disable --enable-experimental
--yes")
+
+ assert "Rolling upgrade disabled" in result, f"Unexpected response:
{result}"
+
+ return result
+
def start_performance_statistics(self):
"""
Start performance statistics collecting in the cluster.
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index b6e2fe05e14..7d475844a2e 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -142,14 +142,21 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePathAware, JvmProcessMix
self.logger.info("Waiting for IgniteAware(s) to stop ...")
for node in self.nodes:
- stopped = self.wait_node(node,
timeout_sec=self.shutdown_timeout_sec)
- assert stopped, "Node %s's worker thread did not stop in %d
seconds" % \
- (str(node.account), self.shutdown_timeout_sec)
+ self.await_stopped_node(node)
- for node in self.nodes:
- wait_until(lambda: not self.alive(node),
timeout_sec=self.shutdown_timeout_sec,
- err_msg="Node %s's remote processes failed to stop in
%d seconds" %
- (str(node.account), self.shutdown_timeout_sec))
+ def await_stopped_node(self, node):
+ """
+ Awaits node stop finished.
+ """
+ self.logger.info(f"Waiting for {self.who_am_i(node)}) to stop ...")
+
+ stopped = self.wait_node(node, timeout_sec=self.shutdown_timeout_sec)
+ assert stopped, "Node %s's worker thread did not stop in %d seconds" %
\
+ (str(node.account), self.shutdown_timeout_sec)
+
+ wait_until(lambda: not self.alive(node),
timeout_sec=self.shutdown_timeout_sec,
+ err_msg="Node %s's remote processes failed to stop in %d
seconds" %
+ (str(node.account), self.shutdown_timeout_sec))
def stop_node(self, node, force_stop=False, **kwargs):
pids = self.pids(node, self.main_java_class)
@@ -157,6 +164,9 @@ class IgniteAwareService(BackgroundThreadService,
IgnitePathAware, JvmProcessMix
for pid in pids:
node.account.signal(pid, signal.SIGKILL if force_stop else
signal.SIGTERM, allow_fail=False)
+ if not force_stop:
+ self.await_stopped_node(node)
+
def clean(self, **kwargs):
self.__restore_iptables()
diff --git
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
index 4fea023e038..e9b3188c94b 100644
---
a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
+++
b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/discovery.py
@@ -19,6 +19,9 @@ Module contains classes and utility methods to create
discovery configuration fo
from abc import ABCMeta, abstractmethod
+from typing import List
+
+from ignitetest.services.ignite import IgniteService
from ignitetest.services.utils.ignite_aware import IgniteAwareService
from ignitetest.services.zk.zookeeper import ZookeeperService
@@ -27,6 +30,7 @@ class DiscoverySpi(metaclass=ABCMeta):
"""
Abstract class for DiscoverySpi.
"""
+
@property
@abstractmethod
def type(self):
@@ -45,6 +49,7 @@ class ZookeeperDiscoverySpi(DiscoverySpi):
"""
ZookeeperDiscoverySpi.
"""
+
def __init__(self, zoo_service, root_path):
self.connection_string = zoo_service.connection_string()
self.port = zoo_service.settings.client_port
@@ -63,6 +68,7 @@ class TcpDiscoveryIpFinder(metaclass=ABCMeta):
"""
Abstract class for TcpDiscoveryIpFinder.
"""
+
@property
@abstractmethod
def type(self):
@@ -81,6 +87,7 @@ class TcpDiscoveryVmIpFinder(TcpDiscoveryIpFinder):
"""
IpFinder with static ips, obtained from cluster nodes.
"""
+
def __init__(self, nodes=None):
self.addresses = TcpDiscoveryVmIpFinder.__get_addresses(nodes) if
nodes else None
@@ -102,6 +109,7 @@ class TcpDiscoverySpi(DiscoverySpi):
"""
TcpDiscoverySpi.
"""
+
def __init__(self, ip_finder=TcpDiscoveryVmIpFinder(), port=47500,
port_range=100, local_address=None):
self.ip_finder = ip_finder
self.port = port
@@ -134,6 +142,23 @@ def from_ignite_cluster(cluster, subset=None):
return TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder(nodes))
+def from_ignite_services(ignite_service_list: List[IgniteService]):
+ """
+ Constructs a `TcpDiscoverySpi` instance from the provided Ignite services.
+ :param ignite_service_list: A list of `IgniteService` objects representing
the cluster.
+ :return: A configured `TcpDiscoverySpi` containing the static IP addresses
of the services.
+ """
+ assert isinstance(ignite_service_list, list)
+ assert all(isinstance(ignite_service, IgniteService) for ignite_service in
ignite_service_list)
+
+ nodes = []
+
+ for ignite_service in ignite_service_list:
+ nodes.extend(ignite_service.nodes)
+
+ return TcpDiscoverySpi(ip_finder=TcpDiscoveryVmIpFinder(nodes))
+
+
def from_zookeeper_cluster(cluster, root_path="/apacheIgnite"):
"""
Form ZookeeperDiscoverySpi from zookeeper service cluster.
diff --git
a/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/__init__.py
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/__init__.py
new file mode 100644
index 00000000000..4d4648b20e1
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This package contains rolling upgrade tests.
+"""
\ No newline at end of file
diff --git
a/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/add_remove_node_upgrade_test.py
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/add_remove_node_upgrade_test.py
new file mode 100644
index 00000000000..b55afb438e6
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/add_remove_node_upgrade_test.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Module contains rolling upgrade tests with new nodes introduced into the
topology and older nodes gracefully removed.
+"""
+from ducktape.mark import defaults, matrix
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_services
+from ignitetest.tests.rebalance.persistent_test import
await_and_check_rebalance
+from ignitetest.tests.rolling_upgrade.util import BaseRollingUpgradeTest,
PRELOADERS_COUNT, NUM_NODES
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.version import LATEST, DEV_BRANCH, IgniteVersion
+
+
+class AddRemoveNodeUpgradeTest(BaseRollingUpgradeTest):
+
+ @cluster(num_nodes=2 * NUM_NODES + PRELOADERS_COUNT)
+ @ignite_versions(str(LATEST))
+ @matrix(with_persistence=[True, False])
+ @defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1],
entry_count=[15_000])
+ def test_add_remove_rolling_upgrade(self, ignite_version, upgrade_version,
force, with_persistence,
+ backups, entry_count):
+ node_count = (self.test_context.expected_num_nodes - PRELOADERS_COUNT)
// 2
+
+ self.check_rolling_upgrade(ignite_version, upgrade_version, force,
with_persistence,
+ backups, entry_count,
self._upgrade_ignite_cluster, node_count)
+
+ def _upgrade_ignite_cluster(self, ignites, upgrade_version, force,
with_persistence):
+ control_sh = ControlUtility(ignites)
+
+
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
+
+ self.logger.info(f"Starting rolling upgrade.")
+
+ upgraded_nodes = []
+
+ for ignite in ignites.nodes:
+ new_node_cfg = ignites.config._replace(
+ version=IgniteVersion(upgrade_version),
+ discovery_spi=from_ignite_services([ignites] + upgraded_nodes)
+ )
+
+ new_node = IgniteService(self.test_context, new_node_cfg,
num_nodes=1)
+
+ new_node.start()
+
+ control_sh = ControlUtility(new_node)
+
+ if with_persistence:
+ control_sh.add_to_baseline(new_node.nodes)
+
+ await_and_check_rebalance(new_node)
+
+ upgraded_nodes.append(new_node)
+
+ ignites.stop_node(ignite)
+
+ if with_persistence:
+ control_sh.remove_from_baseline([ignite])
+
+ self.logger.info(f"Cluster upgrade is complete.")
+
+ control_sh.disable_rolling_upgrade()
+
+ return upgraded_nodes
diff --git
a/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/in_place_node_upgrade_test.py
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/in_place_node_upgrade_test.py
new file mode 100644
index 00000000000..3d373400e59
--- /dev/null
+++
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/in_place_node_upgrade_test.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Module contains in-place rolling upgrade tests
+"""
+from ducktape.mark import matrix, defaults
+
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.tests.rolling_upgrade.util import BaseRollingUpgradeTest,
NUM_NODES
+from ignitetest.utils import cluster, ignite_versions
+from ignitetest.utils.version import IgniteVersion, LATEST, DEV_BRANCH
+
+
+class InPlaceNodeUpgradeTest(BaseRollingUpgradeTest):
+ @cluster(num_nodes=NUM_NODES)
+ @ignite_versions(str(LATEST))
+ @matrix(with_persistence=[True, False], upgrade_coordinator_first=[True,
False])
+ @defaults(upgrade_version=[str(DEV_BRANCH)], force=[False], backups=[1],
entry_count=[15_000])
+ def test_in_place_rolling_upgrade(self, ignite_version, upgrade_version,
force, with_persistence,
+ backups, entry_count,
upgrade_coordinator_first):
+ self.upgrade_coordinator_first = upgrade_coordinator_first
+
+ self.check_rolling_upgrade(ignite_version, upgrade_version, force,
with_persistence,
+ backups, entry_count,
self._upgrade_ignite_cluster)
+
+ def _upgrade_ignite_cluster(self, ignites, upgrade_version, force,
with_persistence):
+ control_sh = ControlUtility(ignites)
+
+
control_sh.enable_rolling_upgrade(IgniteVersion(upgrade_version).vstring, force)
+
+ self.logger.info(
+ f"Starting in-place rolling upgrade "
+ f"{'with coordinator going first' if
self.upgrade_coordinator_first else 'from the last node in the ring'}"
+ )
+
+ ignites.config =
ignites.config._replace(version=IgniteVersion(upgrade_version))
+
+ for ignite in ignites.nodes if self.upgrade_coordinator_first else
reversed(ignites.nodes):
+ self.logger.debug(f"Upgrading {ignites.who_am_i(ignite)}")
+
+ ignites.stop_node(ignite)
+
+ ignites.start_node(ignite)
+
+ ignites.await_started([ignite])
+
+ self.logger.info(f"Cluster upgrade is complete.")
+
+ control_sh.disable_rolling_upgrade()
+
+ return [ignites]
diff --git a/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/util.py
b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/util.py
new file mode 100644
index 00000000000..0801a76f34c
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/rolling_upgrade/util.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Utils for rolling upgrade tests.
+"""
+from typing import List
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import
IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.data_storage import
DataRegionConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import
from_ignite_cluster, from_ignite_services
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import IgniteVersion
+
+NUM_NODES = 4
+PRELOADERS_COUNT = 1
+JAVA_CLASS_NAME =
"org.apache.ignite.internal.ducktest.tests.persistence_upgrade_test.DataLoaderAndCheckerApplication"
+
+
+class BaseRollingUpgradeTest(IgniteTest):
+ """
+ Base class for rolling upgrade tests on an Ignite cluster.
+
+ This class provides a template method `test_rolling_upgrade` that performs
the
+ following steps:
+ 1. Start an Ignite cluster with the given version.
+ 2. Preload data into the cluster.
+ 3. Perform a rolling upgrade via a provided upgrade function.
+ 4. Verify the data integrity after the upgrade.
+ 5. Ensure all nodes are alive and stop the cluster.
+ """
+ def check_rolling_upgrade(self, ignite_version, upgrade_version, force,
with_persistence, backups, entry_count,
+ upgrade_func, init_cluster_size=None):
+ """
+ Template test for performing a rolling upgrade.
+
+ :param ignite_version: Version of Ignite to start the cluster with.
+ :param upgrade_version: Version to upgrade the cluster nodes to.
+ :param force: Whether to force the upgrade.
+ :param with_persistence: Enable persistence for the cluster nodes.
+ :param backups: Number of backup copies for each cache.
+ :param entry_count: Number of entries per cache.
+ :param upgrade_func: Function performing the rolling upgrade on the
cluster.
+ Must accept the following signature:
+ `(cluster: IgniteService, upgrade_version, force,
with_persistence) -> List[IgniteService]`
+ :param init_cluster_size: (optional) Initial cluster size
+ """
+ self.logger.info(
+ f"Initiating Rolling Upgrade test from {ignite_version} to
{upgrade_version} "
+ f"with {'persistent' if with_persistence else 'in-memory'} mode"
+ )
+
+ if init_cluster_size is None:
+ node_count = self.test_context.expected_num_nodes -
PRELOADERS_COUNT
+ else:
+ node_count = init_cluster_size
+
+ ignites = self._start_ignite_cluster(ignite_version, node_count,
with_persistence)
+
+ app = self._configure_data_handler_app(ignites, backups, entry_count)
+
+ self._preload_data(app)
+
+ upgraded_ignites = upgrade_func(ignites, upgrade_version, force,
with_persistence)
+
+ self._check_data(upgraded_ignites, app)
+
+ total_alive = sum(len(ignite.alive_nodes) for ignite in
upgraded_ignites)
+
+ assert total_alive == node_count, f"All nodes should be alive
[expected={node_count}, actual={total_alive}]"
+
+ for ignite in upgraded_ignites:
+ ignite.stop()
+
+ def _start_ignite_cluster(self, ignite_version, node_count,
with_persistence):
+ ignite_cfg = IgniteConfiguration(
+ version=IgniteVersion(ignite_version),
+
metric_exporters={"org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi"})
+
+ if with_persistence:
+ ignite_cfg =
ignite_cfg._replace(data_storage=DataStorageConfiguration(
+ default=DataRegionConfiguration(persistence_enabled=True)))
+
+ ignites = IgniteService(self.test_context, ignite_cfg,
num_nodes=node_count)
+
+ ignites.start()
+
+ self.logger.debug(f"Initial cluster is up
[nodes={len(ignites.nodes)}].")
+
+ control_sh = ControlUtility(ignites)
+
+ if with_persistence:
+ control_sh.activate()
+
+ return ignites
+
+ def _configure_data_handler_app(self, ignites, backups, entry_count):
+ return IgniteApplicationService(
+ self.test_context,
+ config=ignites.config._replace(client_mode=True,
discovery_spi=from_ignite_cluster(ignites)),
+ java_class_name=JAVA_CLASS_NAME,
+ params={
+ "backups": backups,
+ "entryCount": entry_count
+ })
+
+ def _preload_data(self, app: IgniteApplicationService):
+ app.params["check"] = False
+
+ app.start()
+ app.stop()
+
+ self.logger.debug("Data generation is done.")
+
+ def _check_data(self, upgraded_ignites: List[IgniteService], app:
IgniteApplicationService):
+ assert len(upgraded_ignites) > 0, "Upgraded cluster is empty!"
+
+ app.config =
app.config._replace(discovery_spi=from_ignite_services(upgraded_ignites))
+
+ app.params["check"] = True
+
+ app.start()
+ app.stop()
+
+ self.logger.debug("Data check is complete.")