This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f22ad6645bf KAFKA-16272: Adding new coordinator related changes for connect_distributed.py (#15594) f22ad6645bf is described below commit f22ad6645bfec0b38e820e0090261c9f6b421a74 Author: vamossagar12 <sagarmeansoc...@gmail.com> AuthorDate: Fri Apr 19 20:59:50 2024 +0530 KAFKA-16272: Adding new coordinator related changes for connect_distributed.py (#15594) Summary of the changes: Parameterizes the tests to use new coordinator and pass in consumer group protocol. This would be applicable to sink connectors only. Enhances the sink connector creation code in system tests to accept a new optional parameter for consumer group protocol to be used. Sets the consumer group protocol via consumer.override. override config when the new group coordinator is enabled. Note about testing: There are 288 tests that need to be run and running on my local takes a lot of time. I will try to post the test results once I have a full run. Reviewers: Kirk True <kt...@confluent.io>, Lucas Brutschy <lbruts...@confluent.io>, Philip Nee <p...@confluent.io> --- tests/kafkatest/services/connect.py | 20 ++++-- .../tests/connect/connect_distributed_test.py | 72 ++++++++++++---------- 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index d6d5e0d2791..c84a3ec43c3 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -519,12 +519,13 @@ class VerifiableSink(VerifiableConnector): Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output. """ - def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]): + def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"], consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.tasks = tasks self.topics = topics + self.consumer_group_protocol = consumer_group_protocol def flushed_messages(self): return list(filter(lambda m: 'flushed' in m and m['flushed'], self.messages())) @@ -534,33 +535,40 @@ class VerifiableSink(VerifiableConnector): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) - self.cc.create_connector({ + connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) - }) + } + if self.consumer_group_protocol is not None: + connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol + self.cc.create_connector(connector_config) class MockSink(object): - def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): + def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics + self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) - self.cc.create_connector({ + connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 - }) + } + if self.consumer_group_protocol is not None: + connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol + self.cc.create_connector(connector_config) class MockSource(object): diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index e1686e809e0..1e605452398 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -108,9 +108,10 @@ class ConnectDistributedTest(Test): self.zk.start() self.kafka.start() - def _start_connector(self, config_file): + def _start_connector(self, config_file, extra_config={}): connector_props = self.render(config_file) connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) + connector_config.update(extra_config) self.cc.create_connector(connector_config) def _connector_status(self, connector, node=None): @@ -174,16 +175,17 @@ class ConnectDistributedTest(Test): @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() @@ -193,7 +195,7 @@ class ConnectDistributedTest(Test): if exactly_once_source: self.connector = MockSource(self.cc, mode='connector-failure', delay_sec=5) else: - self.connector = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5) + self.connector = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol) self.connector.start() wait_until(lambda: self.connector_is_failed(self.connector), timeout_sec=15, @@ -208,16 +210,17 @@ class ConnectDistributedTest(Test): @matrix( connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() @@ -226,7 +229,7 @@ class ConnectDistributedTest(Test): connector = None if connector_type == "sink": - connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5) + connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol) else: connector = MockSource(self.cc, mode='task-failure', delay_sec=5) @@ -244,21 +247,22 @@ class ConnectDistributedTest(Test): @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5) + self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol) self.sink.start() wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15, @@ -273,16 +277,17 @@ class ConnectDistributedTest(Test): @matrix( connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): self.CONNECT_PROTOCOL = connect_protocol self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) @@ -290,7 +295,7 @@ class ConnectDistributedTest(Test): connector = None if connector_type == "sink": - connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5) + connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol) else: connector = MockSource(self.cc, mode='task-failure', delay_sec=5) @@ -361,15 +366,16 @@ class ConnectDistributedTest(Test): @cluster(num_nodes=5) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Verify that sink connectors stop consuming records when paused and begin again after being resumed. @@ -387,7 +393,7 @@ class ConnectDistributedTest(Test): wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30, err_msg="Timeout expired waiting for source task to produce a message") - self.sink = VerifiableSink(self.cc, topics=[self.TOPIC]) + self.sink = VerifiableSink(self.cc, topics=[self.TOPIC], consumer_group_protocol=group_protocol) self.sink.start() wait_until(lambda: self.is_running(self.sink), timeout_sec=30, @@ -637,7 +643,7 @@ class ConnectDistributedTest(Test): security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -645,9 +651,10 @@ class ConnectDistributedTest(Test): exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False): + def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): """ Tests that a basic file connector works across clean rolling bounces. This validates that the connector is correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes. @@ -662,7 +669,10 @@ class ConnectDistributedTest(Test): self.logger.info("Creating connectors") self._start_connector("connect-file-source.properties") - self._start_connector("connect-file-sink.properties") + if group_protocol is not None: + self._start_connector("connect-file-sink.properties", {"consumer.override.group.protocol" : group_protocol}) + else: + self._start_connector("connect-file-sink.properties") # Generating data on the source node should generate new records and create new output on the sink node. Timeouts # here need to be more generous than they are for standalone mode because a) it takes longer to write configs, @@ -708,7 +718,7 @@ class ConnectDistributedTest(Test): self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100) self.source.start() - self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC]) + self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC], consumer_group_protocol=group_protocol) self.sink.start() for i in range(3): @@ -816,17 +826,16 @@ class ConnectDistributedTest(Test): @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], - metadata_quorum=[quorum.zk, quorum.isolated_kraft], + metadata_quorum=[quorum.zk], use_new_coordinator=[False] ) @matrix( clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True], - group_protocol=consumer_group.all_group_protocols + use_new_coordinator=[True, False] ) - def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None): + def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use_new_coordinator=False): """ Validates that source tasks run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean. @@ -881,8 +890,7 @@ class ConnectDistributedTest(Test): self.source.stop() self.cc.stop() - consumer_properties = consumer_group.maybe_set_group_protocol(group_protocol) - consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed", consumer_properties=consumer_properties) + consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed") consumer.run() src_messages = consumer.messages_consumed[1]