This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f22ad6645bf KAFKA-16272: Adding new coordinator related changes for 
connect_distributed.py (#15594)
f22ad6645bf is described below

commit f22ad6645bfec0b38e820e0090261c9f6b421a74
Author: vamossagar12 <sagarmeansoc...@gmail.com>
AuthorDate: Fri Apr 19 20:59:50 2024 +0530

    KAFKA-16272: Adding new coordinator related changes for 
connect_distributed.py (#15594)
    
    Summary of the changes:
    
    Parameterizes the tests to use new coordinator and pass in consumer group 
protocol. This would be applicable to sink connectors only.
    Enhances the sink connector creation code in system tests to accept a new 
optional parameter for consumer group protocol to be used.
    Sets the consumer group protocol via consumer.override. override config 
when the new group coordinator is enabled.
    Note about testing: There are 288 tests that need to be run and running on 
my local takes a lot of time. I will try to post the test results once I have a 
full run.
    
    Reviewers: Kirk True <kt...@confluent.io>, Lucas Brutschy 
<lbruts...@confluent.io>, Philip Nee <p...@confluent.io>
---
 tests/kafkatest/services/connect.py                | 20 ++++--
 .../tests/connect/connect_distributed_test.py      | 72 ++++++++++++----------
 2 files changed, 54 insertions(+), 38 deletions(-)

diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index d6d5e0d2791..c84a3ec43c3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -519,12 +519,13 @@ class VerifiableSink(VerifiableConnector):
     Helper class for running a verifiable sink connector on a Kafka Connect 
cluster and analyzing the output.
     """
 
-    def __init__(self, cc, name="verifiable-sink", tasks=1, 
topics=["verifiable"]):
+    def __init__(self, cc, name="verifiable-sink", tasks=1, 
topics=["verifiable"], consumer_group_protocol=None):
         self.cc = cc
         self.logger = self.cc.logger
         self.name = name
         self.tasks = tasks
         self.topics = topics
+        self.consumer_group_protocol = consumer_group_protocol
 
     def flushed_messages(self):
         return list(filter(lambda m: 'flushed' in m and m['flushed'], 
self.messages()))
@@ -534,33 +535,40 @@ class VerifiableSink(VerifiableConnector):
 
     def start(self):
         self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-        self.cc.create_connector({
+        connector_config = {
             'name': self.name,
             'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
             'tasks.max': self.tasks,
             'topics': ",".join(self.topics)
-        })
+        }
+        if self.consumer_group_protocol is not None:
+            connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+        self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-    def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+    def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
         self.cc = cc
         self.logger = self.cc.logger
         self.name = name
         self.mode = mode
         self.delay_sec = delay_sec
         self.topics = topics
+        self.consumer_group_protocol = consumer_group_protocol
 
     def start(self):
         self.logger.info("Creating connector MockSinkConnector %s", self.name)
-        self.cc.create_connector({
+        connector_config = {
             'name': self.name,
             'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
             'tasks.max': 1,
             'topics': ",".join(self.topics),
             'mock_mode': self.mode,
             'delay_ms': self.delay_sec * 1000
-        })
+        }
+        if self.consumer_group_protocol is not None:
+            connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+        self.cc.create_connector(connector_config)
 
 class MockSource(object):
 
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index e1686e809e0..1e605452398 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -108,9 +108,10 @@ class ConnectDistributedTest(Test):
             self.zk.start()
         self.kafka.start()
 
-    def _start_connector(self, config_file):
+    def _start_connector(self, config_file, extra_config={}):
         connector_props = self.render(config_file)
         connector_config = dict([line.strip().split('=', 1) for line in 
connector_props.split('\n') if line.strip() and not 
line.strip().startswith('#')])
+        connector_config.update(extra_config)
         self.cc.create_connector(connector_config)
             
     def _connector_status(self, connector, node=None):
@@ -174,16 +175,17 @@ class ConnectDistributedTest(Test):
     @matrix(
         exactly_once_source=[True, False],
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         exactly_once_source=[True, False],
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_restart_failed_connector(self, exactly_once_source, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
+    def test_restart_failed_connector(self, exactly_once_source, 
connect_protocol, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -193,7 +195,7 @@ class ConnectDistributedTest(Test):
         if exactly_once_source:
             self.connector = MockSource(self.cc, mode='connector-failure', 
delay_sec=5)
         else:
-            self.connector = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5)
+            self.connector = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol)
         self.connector.start()
 
         wait_until(lambda: self.connector_is_failed(self.connector), 
timeout_sec=15,
@@ -208,16 +210,17 @@ class ConnectDistributedTest(Test):
     @matrix(
         connector_type=['source', 'exactly-once source', 'sink'],
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         connector_type=['source', 'exactly-once source', 'sink'],
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_restart_failed_task(self, connector_type, connect_protocol, 
metadata_quorum, use_new_coordinator=False):
+    def test_restart_failed_task(self, connector_type, connect_protocol, 
metadata_quorum, use_new_coordinator=False, group_protocol=None):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 
'exactly-once source' else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -226,7 +229,7 @@ class ConnectDistributedTest(Test):
 
         connector = None
         if connector_type == "sink":
-            connector = MockSink(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5)
+            connector = MockSink(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol)
         else:
             connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
             
@@ -244,21 +247,22 @@ class ConnectDistributedTest(Test):
     @cluster(num_nodes=5)
     @matrix(
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_restart_connector_and_tasks_failed_connector(self, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
+    def test_restart_connector_and_tasks_failed_connector(self, 
connect_protocol, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
         self.cc.start()
 
-        self.sink = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5)
+        self.sink = MockSink(self.cc, self.topics.keys(), 
mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol)
         self.sink.start()
 
         wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15,
@@ -273,16 +277,17 @@ class ConnectDistributedTest(Test):
     @matrix(
         connector_type=['source', 'sink'],
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         connector_type=['source', 'sink'],
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_restart_connector_and_tasks_failed_task(self, connector_type, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
+    def test_restart_connector_and_tasks_failed_task(self, connector_type, 
connect_protocol, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -290,7 +295,7 @@ class ConnectDistributedTest(Test):
 
         connector = None
         if connector_type == "sink":
-            connector = MockSink(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5)
+            connector = MockSink(self.cc, self.topics.keys(), 
mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol)
         else:
             connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
 
@@ -361,15 +366,16 @@ class ConnectDistributedTest(Test):
     @cluster(num_nodes=5)
     @matrix(
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
+    def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Verify that sink connectors stop consuming records when paused and 
begin again after
         being resumed.
@@ -387,7 +393,7 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: len(self.source.committed_messages()) > 0, 
timeout_sec=30,
                    err_msg="Timeout expired waiting for source task to produce 
a message")
 
-        self.sink = VerifiableSink(self.cc, topics=[self.TOPIC])
+        self.sink = VerifiableSink(self.cc, topics=[self.TOPIC], 
consumer_group_protocol=group_protocol)
         self.sink.start()
 
         wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
@@ -637,7 +643,7 @@ class ConnectDistributedTest(Test):
         security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
         exactly_once_source=[True, False],
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
@@ -645,9 +651,10 @@ class ConnectDistributedTest(Test):
         exactly_once_source=[True, False], 
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_file_source_and_sink(self, security_protocol, 
exactly_once_source, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
+    def test_file_source_and_sink(self, security_protocol, 
exactly_once_source, connect_protocol, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Tests that a basic file connector works across clean rolling bounces. 
This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work 
is rebalanced across nodes.
@@ -662,7 +669,10 @@ class ConnectDistributedTest(Test):
 
         self.logger.info("Creating connectors")
         self._start_connector("connect-file-source.properties")
-        self._start_connector("connect-file-sink.properties")
+        if group_protocol is not None:
+            self._start_connector("connect-file-sink.properties", 
{"consumer.override.group.protocol" : group_protocol})
+        else:
+            self._start_connector("connect-file-sink.properties")
         
         # Generating data on the source node should generate new records and 
create new output on the sink node. Timeouts
         # here need to be more generous than they are for standalone mode 
because a) it takes longer to write configs,
@@ -708,7 +718,7 @@ class ConnectDistributedTest(Test):
 
         self.source = VerifiableSource(self.cc, topic=self.TOPIC, 
tasks=num_tasks, throughput=100)
         self.source.start()
-        self.sink = VerifiableSink(self.cc, tasks=num_tasks, 
topics=[self.TOPIC])
+        self.sink = VerifiableSink(self.cc, tasks=num_tasks, 
topics=[self.TOPIC], consumer_group_protocol=group_protocol)
         self.sink.start()
 
         for i in range(3):
@@ -816,17 +826,16 @@ class ConnectDistributedTest(Test):
     @matrix(
         clean=[True, False],
         connect_protocol=['sessioned', 'compatible', 'eager'],
-        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
+        metadata_quorum=[quorum.zk],
         use_new_coordinator=[False]
     )
     @matrix(
         clean=[True, False],
         connect_protocol=['sessioned', 'compatible', 'eager'],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True],
-        group_protocol=consumer_group.all_group_protocols
+        use_new_coordinator=[True, False]
     )
-    def test_exactly_once_source(self, clean, connect_protocol, 
metadata_quorum, use_new_coordinator=False, group_protocol=None):
+    def test_exactly_once_source(self, clean, connect_protocol, 
metadata_quorum, use_new_coordinator=False):
         """
         Validates that source tasks run correctly and deliver messages exactly 
once
         when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -881,8 +890,7 @@ class ConnectDistributedTest(Test):
         self.source.stop()
         self.cc.stop()
 
-        consumer_properties = 
consumer_group.maybe_set_group_protocol(group_protocol)
-        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, 
isolation_level="read_committed", consumer_properties=consumer_properties)
+        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, 
isolation_level="read_committed")
         consumer.run()
         src_messages = consumer.messages_consumed[1]
 

Reply via email to