This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new c7cc4d0b687 KAFKA-17434: Do not test impossible scenarios in
upgrade_test.py (#17024)
c7cc4d0b687 is described below
commit c7cc4d0b687ab83c0992ae800fa3e76a15cd3ad0
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Aug 29 12:51:42 2024 -0700
KAFKA-17434: Do not test impossible scenarios in upgrade_test.py (#17024)
Because of KIP-902 (Upgrade Zookeeper version to 3.8.2), it is not possible
to upgrade from a Kafka version
earlier than 2.4 to a version later than 2.4. Therefore, we should not test
these upgrade scenarios
in upgrade_test.py. They do happen to work sometimes, but only in the
trivial case where we don't
create topics or make changes during the upgrade (which would reveal the ZK
incompatibility).
Instead, we should test only supported scenarios.
Reviewers: Reviewers: José Armando García Sancio <[email protected]>
---
tests/kafkatest/tests/core/downgrade_test.py | 12 +------
tests/kafkatest/tests/core/upgrade_test.py | 49 +++-------------------------
2 files changed, 5 insertions(+), 56 deletions(-)
diff --git a/tests/kafkatest/tests/core/downgrade_test.py
b/tests/kafkatest/tests/core/downgrade_test.py
index 68f102e8744..e35f6f75154 100644
--- a/tests/kafkatest/tests/core/downgrade_test.py
+++ b/tests/kafkatest/tests/core/downgrade_test.py
@@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property
from kafkatest.tests.end_to_end import EndToEndTest
-from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2,
LATEST_2_3, LATEST_2_4, LATEST_2_5, \
+from kafkatest.version import LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2,
LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
@@ -122,16 +122,6 @@ class TestDowngrade(EndToEndTest):
# required static membership to be enabled
@parametrize(version=str(LATEST_2_4), compression_types=["none"],
static_membership=True)
@parametrize(version=str(LATEST_2_4), compression_types=["zstd"],
security_protocol="SASL_SSL", static_membership=True)
- @parametrize(version=str(LATEST_2_3), compression_types=["none"])
- @parametrize(version=str(LATEST_2_3), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @parametrize(version=str(LATEST_2_2), compression_types=["none"])
- @parametrize(version=str(LATEST_2_2), compression_types=["zstd"],
security_protocol="SASL_SSL")
- @parametrize(version=str(LATEST_2_1), compression_types=["none"])
- @parametrize(version=str(LATEST_2_1), compression_types=["lz4"],
security_protocol="SASL_SSL")
- @parametrize(version=str(LATEST_2_0), compression_types=["none"])
- @parametrize(version=str(LATEST_2_0), compression_types=["snappy"],
security_protocol="SASL_SSL")
- @parametrize(version=str(LATEST_1_1), compression_types=["none"])
- @parametrize(version=str(LATEST_1_1), compression_types=["lz4"],
security_protocol="SASL_SSL")
def test_upgrade_and_downgrade(self, version, compression_types,
security_protocol="PLAINTEXT",
static_membership=False):
"""Test upgrade and downgrade of Kafka cluster from old versions to
the current version
diff --git a/tests/kafkatest/tests/core/upgrade_test.py
b/tests/kafkatest/tests/core/upgrade_test.py
index f4548b3c87d..279579363bf 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -24,10 +24,9 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.utils.remote_account import java_version
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10,
LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, \
- LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2,
LATEST_2_3, LATEST_2_4, LATEST_2_5, \
- LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2,
LATEST_3_3, LATEST_3_4, LATEST_3_5, \
- LATEST_3_6, LATEST_3_7, LATEST_3_8, V_0_11_0_0, V_2_8_0, V_3_0_0,
DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7,
LATEST_2_8, \
+ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, \
+ LATEST_3_7, LATEST_3_8, V_2_8_0, DEV_BRANCH, KafkaVersion
from kafkatest.services.kafka.util import new_jdk_not_supported
class TestUpgrade(ProduceConsumeValidateTest):
@@ -86,9 +85,6 @@ class TestUpgrade(ProduceConsumeValidateTest):
if to_message_format_version is None:
del node.config[config_property.MESSAGE_FORMAT_VERSION]
else:
- # older message formats are not supported with IBP 3.0 or
higher
- if to_message_format_version < V_0_11_0_0:
- node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
= str(V_2_8_0)
node.config[config_property.MESSAGE_FORMAT_VERSION] =
to_message_format_version
self.kafka.start_node(node)
self.wait_until_rejoin()
@@ -134,38 +130,6 @@ class TestUpgrade(ProduceConsumeValidateTest):
@parametrize(from_kafka_version=str(LATEST_2_5),
to_message_format_version=None, compression_types=["zstd"])
@parametrize(from_kafka_version=str(LATEST_2_4),
to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_2_4),
to_message_format_version=None, compression_types=["zstd"])
- @parametrize(from_kafka_version=str(LATEST_2_3),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_2_3),
to_message_format_version=None, compression_types=["zstd"])
- @parametrize(from_kafka_version=str(LATEST_2_2),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_2_2),
to_message_format_version=None, compression_types=["zstd"])
- @parametrize(from_kafka_version=str(LATEST_2_1),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_2_1),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_2_0),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_2_0),
to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_1_1),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_1_1),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_1_0),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_1_0),
to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_11_0),
to_message_format_version=None, compression_types=["gzip"])
- @parametrize(from_kafka_version=str(LATEST_0_11_0),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_10_2),
to_message_format_version=str(LATEST_0_9), compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_10_2),
to_message_format_version=str(LATEST_0_10), compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_10_2),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_10_2),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_10_1),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_10_1),
to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_10_0),
to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_10_0),
to_message_format_version=None, compression_types=["lz4"])
- @cluster(num_nodes=7)
- @parametrize(from_kafka_version=str(LATEST_0_9),
to_message_format_version=None, compression_types=["none"],
security_protocol="SASL_SSL")
- @cluster(num_nodes=6)
- @parametrize(from_kafka_version=str(LATEST_0_9),
to_message_format_version=None, compression_types=["snappy"])
- @parametrize(from_kafka_version=str(LATEST_0_9),
to_message_format_version=None, compression_types=["lz4"])
- @parametrize(from_kafka_version=str(LATEST_0_9),
to_message_format_version=str(LATEST_0_9), compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_9),
to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
- @cluster(num_nodes=7)
- @parametrize(from_kafka_version=str(LATEST_0_8_2),
to_message_format_version=None, compression_types=["none"])
- @parametrize(from_kafka_version=str(LATEST_0_8_2),
to_message_format_version=None, compression_types=["snappy"])
def test_upgrade(self, from_kafka_version, to_message_format_version,
compression_types,
security_protocol="PLAINTEXT"):
"""Test upgrade of Kafka broker cluster from various versions to the
current version
@@ -213,12 +177,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
compression_types=compression_types,
version=KafkaVersion(from_kafka_version))
- if from_kafka_version <= LATEST_0_10_0:
- assert self.kafka.cluster_id() is None
-
- # With older message formats before KIP-101, message loss may occur
due to truncation
- # after leader change. Tolerate limited data loss for this case to
avoid transient test failures.
- self.may_truncate_acked_records = False if from_kafka_version >=
V_0_11_0_0 else True
+ self.may_truncate_acked_records = False
new_consumer = fromKafkaVersion.consumer_supports_bootstrap_server()
# TODO - reduce the timeout