This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8cc560e9546 Infrastructure for system tests for the new share consumer client (#18209) 8cc560e9546 is described below commit 8cc560e9546e2aa74f36c9908d38b59edb61f70f Author: Chirag Wadhwa <122860692+chirag-wadh...@users.noreply.github.com> AuthorDate: Fri Jan 17 17:33:32 2025 +0530 Infrastructure for system tests for the new share consumer client (#18209) Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Andrew Schofield <aschofi...@confluent.io> --- bin/kafka-verifiable-share-consumer.sh | 20 + checkstyle/suppressions.xml | 2 + tests/kafkatest/services/verifiable_client.py | 10 +- .../services/verifiable_share_consumer.py | 330 +++++++++++ .../kafkatest/tests/client/share_consumer_test.py | 313 ++++++++++ .../tests/verifiable_share_consumer_test.py | 106 ++++ .../kafka/tools/VerifiableShareConsumer.java | 630 +++++++++++++++++++++ 7 files changed, 1406 insertions(+), 5 deletions(-) diff --git a/bin/kafka-verifiable-share-consumer.sh b/bin/kafka-verifiable-share-consumer.sh new file mode 100755 index 00000000000..e30178cebca --- /dev/null +++ b/bin/kafka-verifiable-share-consumer.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableShareConsumer "$@" diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 86baed58e28..19cc0975a09 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -288,6 +288,8 @@ files="VerifiableConsumer.java"/> <suppress id="dontUseSystemExit" files="VerifiableProducer.java"/> + <suppress id="dontUseSystemExit" + files="VerifiableShareConsumer.java"/> <!-- Shell --> <suppress checks="CyclomaticComplexity" diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 4a3ea5e17da..16617d621aa 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -142,10 +142,10 @@ script will be called on the VM just prior to executing the client. def create_verifiable_client_implementation(context, parent): """Factory for generating a verifiable client implementation class instance - :param parent: parent class instance, either VerifiableConsumer or VerifiableProducer + :param parent: parent class instance, either VerifiableConsumer, VerifiableProducer or VerifiableShareConsumer This will first check for a fully qualified client implementation class name - in context.globals as "Verifiable<type>" where <type> is "Producer" or "Consumer", + in context.globals as "Verifiable<type>" where <type> is "Producer" or "Consumer" or "ShareConsumer", followed by "VerifiableClient" (which should implement both). The global object layout is: {"class": "<full class name>", "..anything..": ..}. @@ -232,11 +232,11 @@ class VerifiableClient (object): class VerifiableClientJava (VerifiableClient): """ - Verifiable Consumer and Producer using the official Java client. + Verifiable Consumer, ShareConsumer and Producer using the official Java client. """ def __init__(self, parent, conf=None): """ - :param parent: The parent instance, either VerifiableConsumer or VerifiableProducer + :param parent: The parent instance, either VerifiableConsumer, VerifiableShareConsumer or VerifiableProducer :param conf: Optional conf object (the --globals VerifiableX object) """ super(VerifiableClientJava, self).__init__() @@ -267,7 +267,7 @@ class VerifiableClientDummy (VerifiableClient): """ def __init__(self, parent, conf=None): """ - :param parent: The parent instance, either VerifiableConsumer or VerifiableProducer + :param parent: The parent instance, either VerifiableConsumer, VerifiableShareConsumer or VerifiableProducer :param conf: Optional conf object (the --globals VerifiableX object) """ super(VerifiableClientDummy, self).__init__() diff --git a/tests/kafkatest/services/verifiable_share_consumer.py b/tests/kafkatest/services/verifiable_share_consumer.py new file mode 100644 index 00000000000..ff39e106cf0 --- /dev/null +++ b/tests/kafkatest/services/verifiable_share_consumer.py @@ -0,0 +1,330 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.kafka.util import get_log4j_config_param, get_log4j_config_for_tools +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ShareConsumerState: + Started = 1 + Dead = 2 + +class ShareConsumerEventHandler(object): + + def __init__(self, node, idx, state=ShareConsumerState.Dead): + self.node = node + self.idx = idx + self.total_consumed = 0 + self.total_acknowledged = 0 + self.total_acknowledged_failed = 0 + self.consumed_per_partition = {} + self.acknowledged_per_partition = {} + self.acknowledged_per_partition_failed = {} + self.state = state + + def handle_shutdown_complete(self, node=None, logger=None): + self.state = ShareConsumerState.Dead + if node is not None and logger is not None: + logger.debug("Shut down %s" % node.account.hostname) + + def handle_startup_complete(self, node, logger): + self.state = ShareConsumerState.Started + logger.debug("Started %s" % node.account.hostname) + + def handle_offsets_acknowledged(self, event, node, logger): + if event["success"]: + self.total_acknowledged += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + else: + self.total_acknowledged_failed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition_failed[topic_partition] = self.acknowledged_per_partition_failed.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + logger.debug("Offset acknowledgement failed for: %s" % (node.account.hostname)) + + def handle_records_consumed(self, event, node, logger): + self.total_consumed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.consumed_per_partition[topic_partition] = self.consumed_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets consumed for %s" % (node.account.hostname)) + + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the share consumer + if not clean_shutdown: + self.handle_shutdown_complete() + +class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + + PERSISTENT_ROOT = "/mnt/verifiable_share_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.properties") + + logs = { + "verifiable_share_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", + version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, + on_record_consumed=None): + """ + :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + """ + super(VerifiableShareConsumer, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.offset_reset_strategy = offset_reset_strategy + self.max_messages = max_messages + self.acknowledgement_mode = acknowledgement_mode + self.prop_file = "" + self.stop_timeout_sec = stop_timeout_sec + self.on_record_consumed = on_record_consumed + + self.event_handlers = {} + self.jaas_override_variables = jaas_override_variables or {} + + self.total_records_consumed = 0 + self.total_records_acknowledged = 0 + self.total_records_acknowledged_failed = 0 + self.consumed_records_offsets = set() + self.acknowledged_records_offsets = set() + self.is_offset_reset_strategy_set = False + + for node in self.nodes: + node.version = version + + def java_class_name(self): + return "VerifiableShareConsumer" + + def create_event_handler(self, idx, node): + return ShareConsumerEventHandler(node, idx) + + def _worker(self, idx, node): + with self.lock: + self.event_handlers[node] = self.create_event_handler(idx, node) + handler = self.event_handlers[node] + + node.account.ssh("mkdir -p %s" % VerifiableShareConsumer.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render(get_log4j_config_for_tools(node), log_file=VerifiableShareConsumer.LOG_FILE) + node.account.create_file(get_log4j_config_for_tools(node), log_config) + + # Create and upload config file + self.security_config = self.kafka.security_config.client_config(self.prop_file, node, + self.jaas_override_variables) + self.security_config.setup_node(node) + self.prop_file += str(self.security_config) + self.logger.info("verifiable_share_consumer.properties:") + self.logger.info(self.prop_file) + node.account.create_file(VerifiableShareConsumer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) + + cmd = self.start_cmd(node) + self.logger.debug("VerifiableShareConsumer %d command: %s" % (idx, cmd)) + + for line in node.account.ssh_capture(cmd): + event = self.try_parse_json(node, line.strip()) + if event is not None: + with self.lock: + name = event["name"] + if name == "shutdown_complete": + handler.handle_shutdown_complete(node, self.logger) + elif name == "startup_complete": + handler.handle_startup_complete(node, self.logger) + elif name == "offsets_acknowledged": + handler.handle_offsets_acknowledged(event, node, self.logger) + self._update_global_acknowledged(event) + elif name == "records_consumed": + handler.handle_records_consumed(event, node, self.logger) + self._update_global_consumed(event) + elif name == "record_data" and self.on_record_consumed: + self.on_record_consumed(event, node) + elif name == "offset_reset_strategy_set": + self._on_offset_reset_strategy_set() + else: + self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event)) + + def _update_global_acknowledged(self, acknowledge_event): + if acknowledge_event["success"]: + self.total_records_acknowledged += acknowledge_event["count"] + else: + self.total_records_acknowledged_failed += acknowledge_event["count"] + for share_partition_data in acknowledge_event["partitions"]: + tpkey = str(share_partition_data["topic"]) + "-" + str(share_partition_data["partition"]) + for offset in share_partition_data["offsets"]: + key = tpkey + "-" + str(offset) + if key not in self.acknowledged_records_offsets: + self.acknowledged_records_offsets.add(key) + + def _update_global_consumed(self, consumed_event): + self.total_records_consumed += consumed_event["count"] + + for share_partition_data in consumed_event["partitions"]: + tpkey = str(share_partition_data["topic"]) + "-" + str(share_partition_data["partition"]) + for offset in share_partition_data["offsets"]: + key = tpkey + "-" + str(offset) + if key not in self.consumed_records_offsets: + self.consumed_records_offsets.add(key) + + def _on_offset_reset_strategy_set(self): + self.is_offset_reset_strategy_set = True + + def start_cmd(self, node): + cmd = "" + cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\"; " % (get_log4j_config_param(node), get_log4j_config_for_tools(node)) + cmd += self.impl.exec_cmd(node) + if self.on_record_consumed: + cmd += " --verbose" + + cmd += " --acknowledgement-mode %s" % self.acknowledgement_mode + + cmd += " --offset-reset-strategy %s" % self.offset_reset_strategy + + cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) + + cmd += " --group-id %s --topic %s" % (self.group_id, self.topic) + + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + + cmd += " --consumer.config %s" % VerifiableShareConsumer.CONFIG_FILE + cmd += " 2>> %s | tee -a %s &" % (VerifiableShareConsumer.STDOUT_CAPTURE, VerifiableShareConsumer.STDOUT_CAPTURE) + return cmd + + def pids(self, node): + return self.impl.pids(node) + + def try_parse_json(self, node, string): + """Try to parse a string as json. Return None if not parseable.""" + try: + return json.loads(string) + except ValueError: + self.logger.debug("%s: Could not parse as json: %s" % (str(node.account), str(string))) + return None + + def stop_all(self): + for node in self.nodes: + self.stop_node(node) + + def kill_node(self, node, clean_shutdown=True, allow_fail=False): + sig = self.impl.kill_signal(clean_shutdown) + for pid in self.pids(node): + node.account.signal(pid, sig, allow_fail) + + with self.lock: + self.event_handlers[node].handle_kill_process(clean_shutdown) + + def stop_node(self, node, clean_shutdown=True): + self.kill_node(node, clean_shutdown=clean_shutdown) + + stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) + assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ + (str(node.account), str(self.stop_timeout_sec)) + + def clean_node(self, node): + self.kill_node(node, clean_shutdown=False) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) + + def total_consumed(self): + with self.lock: + return self.total_records_consumed + + def total_unique_consumed(self): + with self.lock: + return len(self.consumed_records_offsets) + + def total_unique_acknowledged(self): + with self.lock: + return len(self.acknowledged_records_offsets) + + def total_acknowledged(self): + with self.lock: + return self.total_records_acknowledged + self.total_records_acknowledged_failed + + def total_successful_acknowledged(self): + with self.lock: + return self.total_records_acknowledged + + def total_failed_acknowledged(self): + with self.lock: + return self.total_records_acknowledged_failed + + def total_consumed_for_a_share_consumer(self, node): + with self.lock: + return self.event_handlers[node].total_consumed + + def total_acknowledged_for_a_share_consumer(self, node): + with self.lock: + return self.event_handlers[node].total_acknowledged + self.event_handlers[node].total_acknowledged_failed + + def total_successful_acknowledged_for_a_share_consumer(self, node): + with self.lock: + return self.event_handlers[node].total_acknowledged + + def total_failed_acknowledged_for_a_share_consumer(self, node): + with self.lock: + return self.event_handlers[node].total_acknowledged_failed + + def offset_reset_strategy_set(self): + with self.lock: + return self.is_offset_reset_strategy_set + + def dead_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.values() + if handler.state == ShareConsumerState.Dead] + + def alive_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.values() + if handler.state == ShareConsumerState.Started] \ No newline at end of file diff --git a/tests/kafkatest/tests/client/share_consumer_test.py b/tests/kafkatest/tests/client/share_consumer_test.py new file mode 100644 index 00000000000..cc74234ac0f --- /dev/null +++ b/tests/kafkatest/tests/client/share_consumer_test.py @@ -0,0 +1,313 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until +from kafkatest.tests.verifiable_share_consumer_test import VerifiableShareConsumerTest + +from kafkatest.services.kafka import TopicPartition, quorum + +import signal + +class ShareConsumerTest(VerifiableShareConsumerTest): + TOPIC1 = {"name": "test_topic1", "partitions": 1,"replication_factor": 1} + TOPIC2 = {"name": "test_topic2", "partitions": 3,"replication_factor": 3} + TOPIC3 = {"name": "test_topic3", "partitions": 3,"replication_factor": 3} + + num_consumers = 3 + num_producers = 1 + num_brokers = 3 + + def __init__(self, test_context): + super(ShareConsumerTest, self).__init__(test_context, num_consumers=self.num_consumers, num_producers=self.num_producers, + num_zk=0, num_brokers=self.num_brokers, topics={ + self.TOPIC1["name"] : { 'partitions': self.TOPIC1["partitions"], 'replication-factor': self.TOPIC1["replication_factor"] }, + self.TOPIC2["name"] : { 'partitions': self.TOPIC2["partitions"], 'replication-factor': self.TOPIC2["replication_factor"] } + }) + + def setup_share_group(self, topic, **kwargs): + consumer = super(ShareConsumerTest, self).setup_share_group(topic, **kwargs) + self.mark_for_collect(consumer, 'verifiable_share_consumer_stdout') + return consumer + + def get_topic_partitions(self, topic): + return [TopicPartition(topic["name"], i) for i in range(topic["partitions"])] + + def wait_until_topic_replicas_settled(self, topic, expected_num_isr, timeout_sec=60): + for partition in range(0, topic["partitions"]): + wait_until(lambda: len(self.kafka.isr_idx_list(topic["name"], partition)) == expected_num_isr, + timeout_sec=timeout_sec, backoff_sec=1, err_msg="the expected number of ISRs did not settle in a reasonable amount of time") + + def wait_until_topic_partition_leaders_settled(self, topic, timeout_sec=60): + def leader_settled(partition_leader, topicName, partition): + try: + partition_leader(topicName, partition) + return True + except Exception: + return False + for partition in range(0, topic["partitions"]): + wait_until(lambda: leader_settled(self.kafka.leader, topic["name"], partition), + timeout_sec=timeout_sec, backoff_sec=1, err_msg="partition leaders did not settle in a reasonable amount of time") + + def rolling_bounce_brokers(self, topic, num_bounces=5, clean_shutdown=True, timeout_sec=60): + for _ in range(num_bounces): + for i in range(len(self.kafka.nodes)): + node = self.kafka.nodes[i] + self.kafka.restart_node(node, clean_shutdown=clean_shutdown) + self.wait_until_topic_replicas_settled(topic, expected_num_isr = topic["replication_factor"], timeout_sec=timeout_sec) + + def fail_brokers(self, topic, num_brokers=1, clean_shutdown=True, timeout_sec=60): + for i in range(num_brokers): + self.kafka.signal_node(self.kafka.nodes[i], signal.SIGTERM if clean_shutdown else signal.SIGKILL) + self.wait_until_topic_replicas_settled(topic, topic["replication_factor"] - (i + 1)) + self.wait_until_topic_partition_leaders_settled(topic, timeout_sec=timeout_sec) + + def rolling_bounce_share_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True, timeout_sec=60): + for _ in range(num_bounces): + num_consumers_killed = 0 + for node in consumer.nodes[keep_alive:]: + consumer.stop_node(node, clean_shutdown) + num_consumers_killed += 1 + wait_until(lambda: len(consumer.dead_nodes()) == 1, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for the share consumer to shutdown") + + consumer.start_node(node) + + self.await_all_members(consumer, timeout_sec=timeout_sec) + self.await_consumed_messages_by_a_consumer(consumer, node, timeout_sec=timeout_sec) + + def bounce_all_share_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_shutdown=True, timeout_sec=60): + for _ in range(num_bounces): + for node in consumer.nodes[keep_alive:]: + consumer.stop_node(node, clean_shutdown) + + wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers - keep_alive, timeout_sec=timeout_sec, + err_msg="Timed out waiting for the share consumers to shutdown") + + num_alive_consumers = keep_alive + for node in consumer.nodes[keep_alive:]: + consumer.start_node(node) + num_alive_consumers += 1 + self.await_members(consumer, num_consumers=num_alive_consumers, timeout_sec=timeout_sec) + self.await_consumed_messages_by_a_consumer(consumer, node, timeout_sec=timeout_sec) + + def fail_share_consumers(self, consumer, num_consumers=1, clean_shutdown=True, timeout_sec=60): + for i in range(num_consumers): + consumer.kill_node(consumer.nodes[i], clean_shutdown=clean_shutdown) + wait_until(lambda: len(consumer.dead_nodes()) == (i + 1), + timeout_sec=timeout_sec, + err_msg="Timed out waiting for the share consumer to be killed") + + @cluster(num_nodes=10) + @matrix( + metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft] + ) + def test_share_single_topic_partition(self, metadata_quorum=quorum.isolated_kraft): + + total_messages = 100000 + producer = self.setup_producer(self.TOPIC1["name"], max_messages=total_messages) + + consumer = self.setup_share_group(self.TOPIC1["name"], offset_reset_strategy="earliest") + + producer.start() + + consumer.start() + self.await_all_members(consumer, timeout_sec=60) + + self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=60) + + assert consumer.total_consumed() >= producer.num_acked + assert consumer.total_acknowledged() == producer.num_acked + + for event_handler in consumer.event_handlers.values(): + assert event_handler.total_consumed > 0 + assert event_handler.total_acknowledged > 0 + + producer.stop() + consumer.stop_all() + + @cluster(num_nodes=10) + @matrix( + metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft] + ) + def test_share_multiple_partitions(self, metadata_quorum=quorum.isolated_kraft): + + total_messages = 1000000 + producer = self.setup_producer(self.TOPIC2["name"], max_messages=total_messages, throughput=5000) + + consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + + producer.start() + + consumer.start() + self.await_all_members(consumer, timeout_sec=60) + + self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=60) + + assert consumer.total_consumed() >= producer.num_acked + assert consumer.total_acknowledged() == producer.num_acked + + for event_handler in consumer.event_handlers.values(): + assert event_handler.total_consumed > 0 + assert event_handler.total_acknowledged > 0 + for topic_partition in self.get_topic_partitions(self.TOPIC2): + assert topic_partition in event_handler.consumed_per_partition + assert event_handler.consumed_per_partition[topic_partition] > 0 + assert topic_partition in event_handler.acknowledged_per_partition + assert event_handler.acknowledged_per_partition[topic_partition] > 0 + + producer.stop() + consumer.stop_all() + + @cluster(num_nodes=10) + @matrix( + clean_shutdown=[True, False], + metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft] + ) + def test_broker_rolling_bounce(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft): + + producer = self.setup_producer(self.TOPIC2["name"]) + consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + + producer.start() + self.await_produced_messages(producer) + + consumer.start() + self.await_all_members(consumer) + + self.await_consumed_messages(consumer, timeout_sec=60) + self.rolling_bounce_brokers(self.TOPIC2, num_bounces=1, clean_shutdown=clean_shutdown) + + # ensure that the share consumers do some work after the broker bounces + self.await_consumed_messages(consumer, min_messages=1000) + + producer.stop() + + self.await_unique_consumed_messages(consumer, min_messages=producer.num_acked, timeout_sec=60) + + assert consumer.total_unique_consumed() >= producer.num_acked + + consumer.stop_all() + + @cluster(num_nodes=10) + @matrix( + clean_shutdown=[True, False], + metadata_quorum=[quorum.isolated_kraft], + num_failed_brokers=[1, 2] + ) + @matrix( + clean_shutdown=[True, False], + metadata_quorum=[quorum.combined_kraft], + num_failed_brokers=[1] + ) + def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1): + + producer = self.setup_producer(self.TOPIC2["name"]) + consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + + producer.start() + self.await_produced_messages(producer) + + consumer.start() + self.await_all_members(consumer) + + # shutdown the required number of brokers + self.fail_brokers(self.TOPIC2, num_brokers=num_failed_brokers, clean_shutdown=clean_shutdown) + + # ensure that the share consumers do some work after the broker failure + self.await_consumed_messages(consumer, min_messages=1000) + + producer.stop() + + self.await_unique_consumed_messages(consumer, min_messages=producer.num_acked, timeout_sec=60) + + assert consumer.total_unique_consumed() >= producer.num_acked + + consumer.stop_all() + + @cluster(num_nodes=10) + @matrix( + clean_shutdown=[True, False], + bounce_mode=["all", "rolling"], + metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft] + ) + def test_share_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk): + """ + Verify correct share consumer behavior when the share consumers in the group are consecutively restarted. + + Setup: single Kafka cluster with one producer and a set of share consumers in one group. + + - Start a producer which continues producing new messages throughout the test. + - Start up the share consumers and wait until they've joined the group. + - In a loop, restart each share consumer, waiting for each one to rejoin the group before + restarting the rest. + - Verify that the share consumers consume all messages produced by the producer atleast once. + """ + + producer = self.setup_producer(self.TOPIC2["name"]) + consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + + producer.start() + self.await_produced_messages(producer) + + consumer.start() + self.await_all_members(consumer) + + if bounce_mode == "all": + self.bounce_all_share_consumers(consumer, clean_shutdown=clean_shutdown) + else: + self.rolling_bounce_share_consumers(consumer, clean_shutdown=clean_shutdown) + + producer.stop() + + self.await_unique_consumed_messages(consumer, min_messages=producer.num_acked, timeout_sec=60) + + assert consumer.total_unique_consumed() >= producer.num_acked + + consumer.stop_all() + + @cluster(num_nodes=10) + @matrix( + clean_shutdown=[True, False], + num_failed_consumers=[1, 2], + metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft] + ) + def test_share_consumer_failure(self, clean_shutdown, metadata_quorum=quorum.zk, num_failed_consumers=1): + + producer = self.setup_producer(self.TOPIC2["name"]) + consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + + # startup the producer and ensure that some records have been written + producer.start() + self.await_produced_messages(producer) + + consumer.start() + self.await_all_members(consumer) + + # stop the required number of share consumers + self.fail_share_consumers(consumer, num_failed_consumers, clean_shutdown=clean_shutdown) + + # ensure that the remaining consumer does some work + self.await_consumed_messages(consumer, min_messages=1000, timeout_sec=60) + + producer.stop() + + self.await_unique_consumed_messages(consumer, min_messages=producer.num_acked, timeout_sec=60) + + assert consumer.total_unique_consumed() >= producer.num_acked + + consumer.stop_all() \ No newline at end of file diff --git a/tests/kafkatest/tests/verifiable_share_consumer_test.py b/tests/kafkatest/tests/verifiable_share_consumer_test.py new file mode 100644 index 00000000000..263a40e62fb --- /dev/null +++ b/tests/kafkatest/tests/verifiable_share_consumer_test.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.utils.util import wait_until + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.verifiable_share_consumer import VerifiableShareConsumer +from kafkatest.services.kafka import TopicPartition + +class VerifiableShareConsumerTest(KafkaTest): + PRODUCER_REQUEST_TIMEOUT_SEC = 30 + + def __init__(self, test_context, num_consumers=1, num_producers=0, **kwargs): + super(VerifiableShareConsumerTest, self).__init__(test_context, **kwargs) + self.num_consumers = num_consumers + self.num_producers = num_producers + + def _all_partitions(self, topic, num_partitions): + partitions = set() + for i in range(num_partitions): + partitions.add(TopicPartition(topic=topic, partition=i)) + return partitions + + def _partitions(self, assignment): + partitions = [] + for parts in assignment.values(): + partitions += parts + return partitions + + def valid_assignment(self, topic, num_partitions, assignment): + all_partitions = self._all_partitions(topic, num_partitions) + partitions = self._partitions(assignment) + return len(partitions) == num_partitions and set(partitions) == all_partitions + + def min_cluster_size(self): + """Override this since we're adding services outside of the constructor""" + return super(VerifiableShareConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers + + def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", offset_reset_strategy="", **kwargs): + return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka, + topic, group_id, acknowledgement_mode=acknowledgement_mode, + offset_reset_strategy=offset_reset_strategy, log_level="TRACE", **kwargs) + + def setup_producer(self, topic, max_messages=-1, throughput=500): + return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic, + max_messages=max_messages, throughput=throughput, + request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC, + log_level="DEBUG") + + def await_produced_messages(self, producer, min_messages=1000, timeout_sec=10): + current_acked = producer.num_acked + wait_until(lambda: producer.num_acked >= current_acked + min_messages, timeout_sec=timeout_sec, + err_msg="Timeout awaiting messages to be produced and acked") + + def await_consumed_messages(self, consumer, min_messages=1, timeout_sec=10, total=False): + current_total = 0 + if total is False: + current_total = consumer.total_consumed() + wait_until(lambda: consumer.total_consumed() >= current_total + min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_consumed_messages_by_a_consumer(self, consumer, node, min_messages=1, timeout_sec=10, total=False): + current_total = 0 + if total is False: + current_total = consumer.total_consumed_for_a_share_consumer(node) + wait_until(lambda: consumer.total_consumed_for_a_share_consumer(node) >= current_total + min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_unique_consumed_messages(self, consumer, min_messages=1, timeout_sec=10): + wait_until(lambda: consumer.total_unique_consumed() >= min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_acknowledged_messages(self, consumer, min_messages=1, timeout_sec=10): + wait_until(lambda: consumer.total_acknowledged() >= min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_unique_acknowledged_messages(self, consumer, min_messages=1, timeout_sec=10): + wait_until(lambda: consumer.total_unique_acknowledged() >= min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_members(self, consumer, num_consumers, timeout_sec=10): + # Wait until all members have started + wait_until(lambda: len(consumer.alive_nodes()) == num_consumers, + timeout_sec=timeout_sec, + err_msg="Consumers failed to start in a reasonable amount of time") + + def await_all_members(self, consumer, timeout_sec=10): + self.await_members(consumer, self.num_consumers, timeout_sec=timeout_sec) \ No newline at end of file diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java new file mode 100644 index 00000000000..654a2960204 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; +import net.sourceforge.argparse4j.inf.Namespace; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class VerifiableShareConsumer implements Closeable, AcknowledgementCommitCallback { + + private static final Logger log = LoggerFactory.getLogger(VerifiableShareConsumer.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final PrintStream out; + private final KafkaShareConsumer<String, String> consumer; + private final String topic; + private final AcknowledgementMode acknowledgementMode; + private final String offsetResetStrategy; + private final Boolean verbose; + private final int maxMessages; + private Integer totalAcknowledged = 0; + private final String brokerHostandPort; + private final String groupId; + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + public static class PartitionData { + private final String topic; + private final int partition; + + public PartitionData(String topic, int partition) { + this.topic = topic; + this.partition = partition; + } + + @JsonProperty + public String topic() { + return topic; + } + + @JsonProperty + public int partition() { + return partition; + } + } + + public static class RecordSetSummary extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public RecordSetSummary(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + + } + + protected static class AcknowledgedData extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public AcknowledgedData(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + } + + @JsonPropertyOrder({ "timestamp", "name" }) + private abstract static class ShareConsumerEvent { + private final long timestamp = System.currentTimeMillis(); + + @JsonProperty + public abstract String name(); + + @JsonProperty + public long timestamp() { + return timestamp; + } + } + + protected static class StartupComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "startup_complete"; + } + } + + @JsonPropertyOrder({ "timestamp", "name", "offsetResetStrategy" }) + protected static class OffsetResetStrategySet extends ShareConsumerEvent { + + private final String offsetResetStrategy; + + public OffsetResetStrategySet(String offsetResetStrategy) { + this.offsetResetStrategy = offsetResetStrategy; + } + + @Override + public String name() { + return "offset_reset_strategy_set"; + } + + @JsonProperty + public String offsetResetStrategy() { + return offsetResetStrategy; + } + } + + protected static class ShutdownComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "shutdown_complete"; + } + } + + @JsonPropertyOrder({ "timestamp", "name", "count", "partitions" }) + public static class RecordsConsumed extends ShareConsumerEvent { + private final long count; + private final List<RecordSetSummary> partitionSummaries; + + public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) { + this.count = count; + this.partitionSummaries = partitionSummaries; + } + + @Override + public String name() { + return "records_consumed"; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public List<RecordSetSummary> partitions() { + return partitionSummaries; + } + } + + @JsonPropertyOrder({ "timestamp", "name", "count", "partitions", "success", "error" }) + protected static class OffsetsAcknowledged extends ShareConsumerEvent { + + private final long count; + private final List<AcknowledgedData> partitions; + private final String error; + private final boolean success; + + public OffsetsAcknowledged(long count, List<AcknowledgedData> partitions, String error, boolean success) { + this.count = count; + this.partitions = partitions; + this.error = error; + this.success = success; + } + + @Override + public String name() { + return "offsets_acknowledged"; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public List<AcknowledgedData> partitions() { + return partitions; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public String error() { + return error; + } + + @JsonProperty + public boolean success() { + return success; + } + + } + + @JsonPropertyOrder({ "timestamp", "name", "key", "value", "topic", "partition", "offset" }) + public static class RecordData extends ShareConsumerEvent { + + private final ConsumerRecord<String, String> record; + + public RecordData(ConsumerRecord<String, String> record) { + this.record = record; + } + + @Override + public String name() { + return "record_data"; + } + + @JsonProperty + public String topic() { + return record.topic(); + } + + @JsonProperty + public int partition() { + return record.partition(); + } + + @JsonProperty + public String key() { + return record.key(); + } + + @JsonProperty + public String value() { + return record.value(); + } + + @JsonProperty + public long offset() { + return record.offset(); + } + + } + + public VerifiableShareConsumer(KafkaShareConsumer<String, String> consumer, + PrintStream out, + Integer maxMessages, + String topic, + AcknowledgementMode acknowledgementMode, + String offsetResetStrategy, + String brokerHostandPort, + String groupId, + Boolean verbose) { + this.out = out; + this.consumer = consumer; + this.topic = topic; + this.acknowledgementMode = acknowledgementMode; + this.offsetResetStrategy = offsetResetStrategy; + this.verbose = verbose; + this.maxMessages = maxMessages; + this.brokerHostandPort = brokerHostandPort; + this.groupId = groupId; + addKafkaSerializerModule(); + } + + private void addKafkaSerializerModule() { + SimpleModule kafka = new SimpleModule(); + kafka.addSerializer(TopicPartition.class, new JsonSerializer<>() { + @Override + public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + gen.writeObjectField("topic", tp.topic()); + gen.writeObjectField("partition", tp.partition()); + gen.writeEndObject(); + } + }); + mapper.registerModule(kafka); + } + + private void onRecordsReceived(ConsumerRecords<String, String> records) { + List<RecordSetSummary> summaries = new ArrayList<>(); + for (TopicPartition tp : records.partitions()) { + List<ConsumerRecord<String, String>> partitionRecords = records.records(tp); + + if (partitionRecords.isEmpty()) + continue; + + TreeSet<Long> partitionOffsets = new TreeSet<>(); + + for (ConsumerRecord<String, String> record : partitionRecords) { + partitionOffsets.add(record.offset()); + } + + summaries.add(new RecordSetSummary(tp.topic(), tp.partition(), partitionOffsets)); + + if (this.verbose) { + for (ConsumerRecord<String, String> record : partitionRecords) { + printJson(new RecordData(record)); + } + } + } + + printJson(new RecordsConsumed(records.count(), summaries)); + } + + @Override + public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) { + List<AcknowledgedData> acknowledgedOffsets = new ArrayList<>(); + int totalAcknowledged = 0; + for (Map.Entry<TopicIdPartition, Set<Long>> offsetEntry : offsetsMap.entrySet()) { + TopicIdPartition tp = offsetEntry.getKey(); + acknowledgedOffsets.add(new AcknowledgedData(tp.topic(), tp.partition(), offsetEntry.getValue())); + totalAcknowledged += offsetEntry.getValue().size(); + } + + boolean success = true; + String error = null; + if (exception != null) { + success = false; + error = exception.getMessage(); + } + printJson(new OffsetsAcknowledged(totalAcknowledged, acknowledgedOffsets, error, success)); + if (success) { + this.totalAcknowledged += totalAcknowledged; + } + } + + public void run() { + try { + printJson(new StartupComplete()); + + if (!Objects.equals(offsetResetStrategy, "")) { + ShareGroupAutoOffsetResetStrategy offsetResetStrategy = + ShareGroupAutoOffsetResetStrategy.fromString(this.offsetResetStrategy); + + Properties adminClientProps = new Properties(); + adminClientProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort); + + Admin adminClient = Admin.create(adminClientProps); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); + Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>(); + alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( + GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, offsetResetStrategy.type().toString()), AlterConfigOp.OpType.SET))); + AlterConfigsOptions alterOptions = new AlterConfigsOptions(); + + // Setting the share group auto offset reset strategy + adminClient.incrementalAlterConfigs(alterEntries, alterOptions) + .all() + .get(60, TimeUnit.SECONDS); + + printJson(new OffsetResetStrategySet(offsetResetStrategy.type().toString())); + } + + consumer.subscribe(Collections.singleton(this.topic)); + consumer.setAcknowledgementCommitCallback(this); + while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); + if (!records.isEmpty()) { + onRecordsReceived(records); + + if (acknowledgementMode == AcknowledgementMode.ASYNC) { + consumer.commitAsync(); + } else if (acknowledgementMode == AcknowledgementMode.SYNC) { + Map<TopicIdPartition, Optional<KafkaException>> result = consumer.commitSync(); + for (Map.Entry<TopicIdPartition, Optional<KafkaException>> resultEntry : result.entrySet()) { + if (resultEntry.getValue().isPresent()) { + log.error("Failed to commit offset synchronously for topic partition: {}", resultEntry.getKey()); + } + } + } + } + } + } catch (WakeupException e) { + // ignore, we are closing + log.trace("Caught WakeupException because share consumer is shutdown, ignore and terminate.", e); + } catch (Throwable t) { + // Log the error, so it goes to the service log and not stdout + log.error("Error during processing, terminating share consumer process: ", t); + } finally { + consumer.close(); + printJson(new ShutdownComplete()); + shutdownLatch.countDown(); + } + } + + public void close() { + boolean interrupted = false; + try { + consumer.wakeup(); + while (true) { + try { + shutdownLatch.await(); + return; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) + Thread.currentThread().interrupt(); + } + } + + protected synchronized void printJson(Object data) { + try { + out.println(mapper.writeValueAsString(data)); + } catch (JsonProcessingException e) { + out.println("Bad data can't be written as json: " + e.getMessage()); + } + } + + public enum AcknowledgementMode { + AUTO, ASYNC, SYNC; + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + } + + private static ArgumentParser argParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("verifiable-share-consumer") + .defaultHelp(true) + .description("This tool creates a share group and consumes messages from a specific topic and emits share consumer events (e.g. share consumer startup, received messages, and offsets acknowledged) as JSON objects to STDOUT."); + MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group") + .description("Group of arguments for connection to brokers") + .required(true); + connectionGroup.addArgument("--bootstrap-server") + .action(store()) + .required(true) + .type(String.class) + .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") + .dest("bootstrapServer") + .help("The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); + + parser.addArgument("--topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("TOPIC") + .help("Consumes messages from this topic."); + + parser.addArgument("--group-id") + .action(store()) + .required(true) + .type(String.class) + .metavar("GROUP_ID") + .dest("groupId") + .help("The groupId shared among members of the share group"); + + parser.addArgument("--max-messages") + .action(store()) + .required(false) + .type(Integer.class) + .setDefault(-1) + .metavar("MAX-MESSAGES") + .dest("maxMessages") + .help("Consume this many messages. If -1 (the default), the share consumers will consume until the process is killed externally"); + + parser.addArgument("--verbose") + .action(storeTrue()) + .type(Boolean.class) + .metavar("VERBOSE") + .help("Enable to log individual consumed records"); + + parser.addArgument("--acknowledgement-mode") + .action(store()) + .required(false) + .setDefault("auto") + .type(String.class) + .dest("acknowledgementMode") + .help("Acknowledgement mode for the share consumers (must be either 'auto', 'sync' or 'async')"); + + parser.addArgument("--offset-reset-strategy") + .action(store()) + .required(false) + .setDefault("") + .type(String.class) + .dest("offsetResetStrategy") + .help("Set share group reset strategy (must be either 'earliest' or 'latest')"); + + parser.addArgument("--consumer.config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG_FILE") + .help("Consumer config properties file (config options shared with command line parameters will be overridden)."); + + return parser; + } + + public static VerifiableShareConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException { + Namespace res = parser.parseArgs(args); + + AcknowledgementMode acknowledgementMode = + AcknowledgementMode.valueOf(res.getString("acknowledgementMode").toUpperCase(Locale.ROOT)); + String offsetResetStrategy = res.getString("offsetResetStrategy").toLowerCase(Locale.ROOT); + String configFile = res.getString("consumer.config"); + String brokerHostandPort = res.getString("bootstrapServer"); + + Properties consumerProps = new Properties(); + if (configFile != null) { + try { + consumerProps.putAll(Utils.loadProps(configFile)); + } catch (IOException e) { + throw new ArgumentParserException(e.getMessage(), parser); + } + } + + String groupId = res.getString("groupId"); + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort); + + String topic = res.getString("topic"); + int maxMessages = res.getInt("maxMessages"); + boolean verbose = res.getBoolean("verbose"); + + StringDeserializer deserializer = new StringDeserializer(); + KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(consumerProps, deserializer, deserializer); + + return new VerifiableShareConsumer( + consumer, + System.out, + maxMessages, + topic, + acknowledgementMode, + offsetResetStrategy, + brokerHostandPort, + groupId, + verbose); + } + + public static void main(String[] args) { + ArgumentParser parser = argParser(); + if (args.length == 0) { + parser.printHelp(); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(0); + } + try { + final VerifiableShareConsumer shareConsumer = createFromArgs(parser, args); + // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0. + Runtime.getRuntime().addShutdownHook(new Thread(shareConsumer::close, "verifiable-share-consumer-shutdown-hook")); + shareConsumer.run(); + } catch (ArgumentParserException e) { + parser.handleError(e); + // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. + System.exit(1); + } + } + +}