This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29881782c84 KAFKA-17609 Migrate broker compatibility test from ZK to
KRaft (#17603)
29881782c84 is described below
commit 29881782c842b3d8ecbd2215a12956f3536d4e31
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Oct 30 16:51:06 2024 -0400
KAFKA-17609 Migrate broker compatibility test from ZK to KRaft (#17603)
Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
Vagrantfile | 4 +-
.../streams/streams_broker_compatibility_test.py | 63 +++++-----------------
2 files changed, 15 insertions(+), 52 deletions(-)
diff --git a/Vagrantfile b/Vagrantfile
index a053be28d01..3f64a4a9659 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil
ebs_volume_type = 'gp3'
-jdk_major = '8'
-jdk_full = '8u202-linux-x64'
+jdk_major = '11'
+jdk_full = '11.0.2-linux-x64'
local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local")
if File.exists?(local_config_file) then
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 6ed8d7f42ce..953ce2263d4 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -18,23 +18,16 @@ from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import LATEST_1_0, LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
- LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
+from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3,
LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
class StreamsBrokerCompatibility(Test):
"""
These tests validates that
- - Streams works for older brokers 0.11 (or newer)
- - Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
- Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
- - Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
- - Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
"""
input = "brokerCompatibilitySourceTopic"
@@ -42,10 +35,9 @@ class StreamsBrokerCompatibility(Test):
def __init__(self, test_context):
super(StreamsBrokerCompatibility,
self).__init__(test_context=test_context)
- self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
- zk=self.zk,
+ zk=None,
topics={
self.input: {'partitions': 1,
'replication-factor': 1},
self.output: {'partitions': 1,
'replication-factor': 1}
@@ -60,17 +52,14 @@ class StreamsBrokerCompatibility(Test):
self.output,
"stream-broker-compatibility-verify-consumer")
- def setUp(self):
- self.zk.start()
-
@cluster(num_nodes=4)
-
@matrix(broker_version=[str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),str(LATEST_2_1),
-
str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),str(LATEST_2_5),
-
str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),str(LATEST_3_0),
-
str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),str(LATEST_3_4),
-
str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),str(LATEST_3_8)])
- def test_compatible_brokers_eos_disabled(self, broker_version):
+
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
+
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
+ str(LATEST_3_8)],
+ metadata_quorum=[quorum.combined_kraft]
+ )
+ def test_compatible_brokers_eos_disabled(self, broker_version,
metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
@@ -87,11 +76,11 @@ class StreamsBrokerCompatibility(Test):
self.kafka.stop()
@cluster(num_nodes=4)
-
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
-
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
+
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
- str(LATEST_3_8)])
- def test_compatible_brokers_eos_v2_enabled(self, broker_version):
+ str(LATEST_3_8)],
+ metadata_quorum=[quorum.combined_kraft])
+ def test_compatible_brokers_eos_v2_enabled(self, broker_version,
metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
@@ -106,29 +95,3 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop()
self.kafka.stop()
-
- @cluster(num_nodes=4)
- @parametrize(broker_version=str(LATEST_2_4))
- @parametrize(broker_version=str(LATEST_2_3))
- @parametrize(broker_version=str(LATEST_2_2))
- @parametrize(broker_version=str(LATEST_2_1))
- @parametrize(broker_version=str(LATEST_2_0))
- @parametrize(broker_version=str(LATEST_1_1))
- @parametrize(broker_version=str(LATEST_1_0))
- def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self,
broker_version):
- self.kafka.set_version(KafkaVersion(broker_version))
- self.kafka.start()
-
- processor = StreamsBrokerCompatibilityService(self.test_context,
self.kafka, "exactly_once_v2")
-
- with processor.node.account.monitor_log(processor.STDERR_FILE) as
monitor:
- with processor.node.account.monitor_log(processor.LOG_FILE) as log:
- processor.start()
- log.wait_until('Shutting down because the Kafka cluster seems
to be on a too old version. Setting processing\.guarantee="exactly_once_v2"
requires broker version 2\.5 or higher\.',
- timeout_sec=60,
- err_msg="Never saw 'Shutting down because the
Kafka cluster seems to be on a too old version. Setting
`processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or
higher.' log message " + str(processor.node.account))
- monitor.wait_until('FATAL: An unexpected exception
org.apache.kafka.common.errors.UnsupportedVersionException',
- timeout_sec=60,
- err_msg="Never saw 'FATAL: An unexpected
exception org.apache.kafka.common.errors.UnsupportedVersionException' error
message " + str(processor.node.account))
-
- self.kafka.stop()