This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29881782c84 KAFKA-17609 Migrate broker compatibility test from ZK to 
KRaft (#17603)
29881782c84 is described below

commit 29881782c842b3d8ecbd2215a12956f3536d4e31
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Oct 30 16:51:06 2024 -0400

    KAFKA-17609 Migrate broker compatibility test from ZK to KRaft (#17603)
    
    Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 Vagrantfile                                        |  4 +-
 .../streams/streams_broker_compatibility_test.py   | 63 +++++-----------------
 2 files changed, 15 insertions(+), 52 deletions(-)

diff --git a/Vagrantfile b/Vagrantfile
index a053be28d01..3f64a4a9659 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil
 
 ebs_volume_type = 'gp3'
 
-jdk_major = '8'
-jdk_full = '8u202-linux-x64'
+jdk_major = '11'
+jdk_full = '11.0.2-linux-x64'
 
 local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local")
 if File.exists?(local_config_file) then
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py 
b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 6ed8d7f42ce..953ce2263d4 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -18,23 +18,16 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StreamsBrokerCompatibilityService
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import LATEST_1_0, LATEST_1_1, \
-    LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
-    LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, 
LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
+from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, 
LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
 
 
 class StreamsBrokerCompatibility(Test):
     """
     These tests validates that
-    - Streams works for older brokers 0.11 (or newer)
-    - Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
     - Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
-    - Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
-    - Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
     """
 
     input = "brokerCompatibilitySourceTopic"
@@ -42,10 +35,9 @@ class StreamsBrokerCompatibility(Test):
 
     def __init__(self, test_context):
         super(StreamsBrokerCompatibility, 
self).__init__(test_context=test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context,
                                   num_nodes=1,
-                                  zk=self.zk,
+                                  zk=None,
                                   topics={
                                       self.input: {'partitions': 1, 
'replication-factor': 1},
                                       self.output: {'partitions': 1, 
'replication-factor': 1}
@@ -60,17 +52,14 @@ class StreamsBrokerCompatibility(Test):
                                            self.output,
                                            
"stream-broker-compatibility-verify-consumer")
 
-    def setUp(self):
-        self.zk.start()
-
 
     @cluster(num_nodes=4)
-    
@matrix(broker_version=[str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),str(LATEST_2_1),
-                            
str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),str(LATEST_2_5),
-                            
str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),str(LATEST_3_0),
-                            
str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),str(LATEST_3_4),
-                            
str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),str(LATEST_3_8)])
-    def test_compatible_brokers_eos_disabled(self, broker_version):
+    
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
+                            
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
+                            str(LATEST_3_8)],
+            metadata_quorum=[quorum.combined_kraft]
+            )
+    def test_compatible_brokers_eos_disabled(self, broker_version, 
metadata_quorum):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
@@ -87,11 +76,11 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.stop()
 
     @cluster(num_nodes=4)
-    
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
-                            
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
+    
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
                             
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
-                            str(LATEST_3_8)])
-    def test_compatible_brokers_eos_v2_enabled(self, broker_version):
+                            str(LATEST_3_8)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_compatible_brokers_eos_v2_enabled(self, broker_version, 
metadata_quorum):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
@@ -106,29 +95,3 @@ class StreamsBrokerCompatibility(Test):
 
         self.consumer.stop()
         self.kafka.stop()
-
-    @cluster(num_nodes=4)
-    @parametrize(broker_version=str(LATEST_2_4))
-    @parametrize(broker_version=str(LATEST_2_3))
-    @parametrize(broker_version=str(LATEST_2_2))
-    @parametrize(broker_version=str(LATEST_2_1))
-    @parametrize(broker_version=str(LATEST_2_0))
-    @parametrize(broker_version=str(LATEST_1_1))
-    @parametrize(broker_version=str(LATEST_1_0))
-    def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, 
broker_version):
-        self.kafka.set_version(KafkaVersion(broker_version))
-        self.kafka.start()
-
-        processor = StreamsBrokerCompatibilityService(self.test_context, 
self.kafka, "exactly_once_v2")
-
-        with processor.node.account.monitor_log(processor.STDERR_FILE) as 
monitor:
-            with processor.node.account.monitor_log(processor.LOG_FILE) as log:
-                processor.start()
-                log.wait_until('Shutting down because the Kafka cluster seems 
to be on a too old version. Setting processing\.guarantee="exactly_once_v2" 
requires broker version 2\.5 or higher\.',
-                               timeout_sec=60,
-                               err_msg="Never saw 'Shutting down because the 
Kafka cluster seems to be on a too old version. Setting 
`processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or 
higher.' log message " + str(processor.node.account))
-                monitor.wait_until('FATAL: An unexpected exception 
org.apache.kafka.common.errors.UnsupportedVersionException',
-                                   timeout_sec=60,
-                                   err_msg="Never saw 'FATAL: An unexpected 
exception org.apache.kafka.common.errors.UnsupportedVersionException' error 
message " + str(processor.node.account))
-
-        self.kafka.stop()

Reply via email to