This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 5690489b6b6 KAFKA-18346 Fix e2e TestKRaftUpgrade for v3.3.2 (#18386) 5690489b6b6 is described below commit 5690489b6b6bbb2f36965b1c2c98eba98d660961 Author: TaiJuWu <tjwu1...@gmail.com> AuthorDate: Wed Jan 15 20:37:55 2025 +0800 KAFKA-18346 Fix e2e TestKRaftUpgrade for v3.3.2 (#18386) Due to an issue with handling folders in Kafka version 3.3.2 (see https://github.com/apache/kafka/pull/13130), this end-to-end test requires using a single folder for upgrade/downgrade scenarios involving 3.3.2. Reviewers: Ismael Juma <ism...@juma.me.uk>, Chia-Ping Tsai <chia7...@gmail.com> --- tests/kafkatest/services/kafka/kafka.py | 5 +++-- tests/kafkatest/tests/core/kraft_upgrade_test.py | 13 +++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index af1133716f7..018bf382529 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -255,7 +255,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): :param jmx_attributes: :param int zk_connect_timeout: :param int zk_session_timeout: - :param list[list] server_prop_overrides: overrides for kafka.properties file + :param list[list] server_prop_overrides: overrides for kafka.properties file, if the second value is None or "", it will be filtered e.g: [["config1", "true"], ["config2", "1000"]] :param str zk_chroot: :param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True @@ -781,7 +781,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): #update template configs with test override configs configs.update(override_configs) - prop_file = self.render_configs(configs) + filtered_configs = {k: v for k, v in configs.items() if v not in [None, ""]} + prop_file = self.render_configs(filtered_configs) return prop_file def render_configs(self, configs): diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py b/tests/kafkatest/tests/core/kraft_upgrade_test.py index 604ffa01803..ab724a91e9e 100644 --- a/tests/kafkatest/tests/core/kraft_upgrade_test.py +++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py @@ -17,7 +17,7 @@ from ducktape.mark import parametrize, matrix from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property, KafkaService from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest @@ -138,6 +138,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): - Perform rolling downgrade. - Finally, validate that every message acked by the producer was consumed by the consumer. """ + + # Due to compatability issue with version 3.3, we need to use a single folder. Using multiple folders + # will cause broker to throw InconsistentBrokerMetadataException during startup. + # see https://github.com/apache/kafka/pull/13130 + server_prop_overrides = None + if starting_kafka_version == str(LATEST_3_3): + server_prop_overrides = [[config_property.LOG_DIRS, "/mnt/kafka/kafka-metadata-logs"], [config_property.METADATA_LOG_DIR, ""]] + fromKafkaVersion = KafkaVersion(starting_kafka_version) self.kafka = KafkaService(self.test_context, num_nodes=3, @@ -145,7 +153,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): version=fromKafkaVersion, topics={self.topic: {"partitions": self.partitions, "replication-factor": self.replication_factor, - 'configs': {"min.insync.replicas": 2}}}) + 'configs': {"min.insync.replicas": 2}}}, + server_prop_overrides = server_prop_overrides) self.kafka.start() self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput,