This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8cc560e9546 Infrastructure for system tests for the new share consumer 
client (#18209)
8cc560e9546 is described below

commit 8cc560e9546e2aa74f36c9908d38b59edb61f70f
Author: Chirag Wadhwa <122860692+chirag-wadh...@users.noreply.github.com>
AuthorDate: Fri Jan 17 17:33:32 2025 +0530

    Infrastructure for system tests for the new share consumer client (#18209)
    
    Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Andrew Schofield 
<aschofi...@confluent.io>
---
 bin/kafka-verifiable-share-consumer.sh             |  20 +
 checkstyle/suppressions.xml                        |   2 +
 tests/kafkatest/services/verifiable_client.py      |  10 +-
 .../services/verifiable_share_consumer.py          | 330 +++++++++++
 .../kafkatest/tests/client/share_consumer_test.py  | 313 ++++++++++
 .../tests/verifiable_share_consumer_test.py        | 106 ++++
 .../kafka/tools/VerifiableShareConsumer.java       | 630 +++++++++++++++++++++
 7 files changed, 1406 insertions(+), 5 deletions(-)

diff --git a/bin/kafka-verifiable-share-consumer.sh 
b/bin/kafka-verifiable-share-consumer.sh
new file mode 100755
index 00000000000..e30178cebca
--- /dev/null
+++ b/bin/kafka-verifiable-share-consumer.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.VerifiableShareConsumer "$@"
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 86baed58e28..19cc0975a09 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -288,6 +288,8 @@
               files="VerifiableConsumer.java"/>
     <suppress id="dontUseSystemExit"
               files="VerifiableProducer.java"/>
+    <suppress id="dontUseSystemExit"
+              files="VerifiableShareConsumer.java"/>
 
     <!-- Shell -->
     <suppress checks="CyclomaticComplexity"
diff --git a/tests/kafkatest/services/verifiable_client.py 
b/tests/kafkatest/services/verifiable_client.py
index 4a3ea5e17da..16617d621aa 100644
--- a/tests/kafkatest/services/verifiable_client.py
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -142,10 +142,10 @@ script will be called on the VM just prior to executing 
the client.
 def create_verifiable_client_implementation(context, parent):
     """Factory for generating a verifiable client implementation class instance
 
-    :param parent: parent class instance, either VerifiableConsumer or 
VerifiableProducer
+    :param parent: parent class instance, either VerifiableConsumer, 
VerifiableProducer or VerifiableShareConsumer
 
     This will first check for a fully qualified client implementation class 
name
-    in context.globals as "Verifiable<type>" where <type> is "Producer" or 
"Consumer",
+    in context.globals as "Verifiable<type>" where <type> is "Producer" or 
"Consumer" or "ShareConsumer",
     followed by "VerifiableClient" (which should implement both).
     The global object layout is: {"class": "<full class name>", 
"..anything..": ..}.
 
@@ -232,11 +232,11 @@ class VerifiableClient (object):
 
 class VerifiableClientJava (VerifiableClient):
     """
-    Verifiable Consumer and Producer using the official Java client.
+    Verifiable Consumer, ShareConsumer and Producer using the official Java 
client.
     """
     def __init__(self, parent, conf=None):
         """
-        :param parent: The parent instance, either VerifiableConsumer or 
VerifiableProducer
+        :param parent: The parent instance, either VerifiableConsumer, 
VerifiableShareConsumer or VerifiableProducer
         :param conf: Optional conf object (the --globals VerifiableX object)
         """
         super(VerifiableClientJava, self).__init__()
@@ -267,7 +267,7 @@ class VerifiableClientDummy (VerifiableClient):
     """
     def __init__(self, parent, conf=None):
         """
-        :param parent: The parent instance, either VerifiableConsumer or 
VerifiableProducer
+        :param parent: The parent instance, either VerifiableConsumer, 
VerifiableShareConsumer or VerifiableProducer
         :param conf: Optional conf object (the --globals VerifiableX object)
         """
         super(VerifiableClientDummy, self).__init__()
diff --git a/tests/kafkatest/services/verifiable_share_consumer.py 
b/tests/kafkatest/services/verifiable_share_consumer.py
new file mode 100644
index 00000000000..ff39e106cf0
--- /dev/null
+++ b/tests/kafkatest/services/verifiable_share_consumer.py
@@ -0,0 +1,330 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.kafka.util import get_log4j_config_param, 
get_log4j_config_for_tools
+from kafkatest.services.verifiable_client import VerifiableClientMixin
+from kafkatest.version import DEV_BRANCH
+
+class ShareConsumerState:
+    Started = 1
+    Dead = 2
+
+class ShareConsumerEventHandler(object):
+
+    def __init__(self, node, idx, state=ShareConsumerState.Dead):
+        self.node = node
+        self.idx = idx
+        self.total_consumed = 0
+        self.total_acknowledged = 0
+        self.total_acknowledged_failed = 0
+        self.consumed_per_partition = {}
+        self.acknowledged_per_partition = {}
+        self.acknowledged_per_partition_failed = {}
+        self.state = state
+
+    def handle_shutdown_complete(self, node=None, logger=None):
+        self.state = ShareConsumerState.Dead
+        if node is not None and logger is not None:
+            logger.debug("Shut down %s" % node.account.hostname)
+
+    def handle_startup_complete(self, node, logger):
+        self.state = ShareConsumerState.Started
+        logger.debug("Started %s" % node.account.hostname)
+
+    def handle_offsets_acknowledged(self, event, node, logger):
+        if event["success"]:
+            self.total_acknowledged += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition[topic_partition] = 
self.acknowledged_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+        else:
+            self.total_acknowledged_failed += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition_failed[topic_partition] = 
self.acknowledged_per_partition_failed.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+            logger.debug("Offset acknowledgement failed for: %s" % 
(node.account.hostname))
+
+    def handle_records_consumed(self, event, node, logger):
+        self.total_consumed += event["count"]
+        for share_partition_data in event["partitions"]:
+            topic_partition = TopicPartition(share_partition_data["topic"], 
share_partition_data["partition"])
+            self.consumed_per_partition[topic_partition] = 
self.consumed_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+        logger.debug("Offsets consumed for %s" % (node.account.hostname))
+
+
+    def handle_kill_process(self, clean_shutdown):
+        # if the shutdown was clean, then we expect the explicit
+        # shutdown event from the share consumer
+        if not clean_shutdown:
+            self.handle_shutdown_complete()
+
+class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, 
BackgroundThreadService):
+    """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for 
use in
+    system testing.
+
+    NOTE: this class should be treated as a PUBLIC API. Downstream users use
+    this service both directly and through class extension, so care must be
+    taken to ensure compatibility.
+    """
+
+    PERSISTENT_ROOT = "/mnt/verifiable_share_consumer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.properties")
+
+    logs = {
+            "verifiable_share_consumer_stdout": {
+                "path": STDOUT_CAPTURE,
+                "collect_default": False},
+            "verifiable_share_consumer_stderr": {
+                "path": STDERR_CAPTURE,
+                "collect_default": False},
+            "verifiable_share_consumer_log": {
+                "path": LOG_FILE,
+                "collect_default": True}
+            }
+
+    def __init__(self, context, num_nodes, kafka, topic, group_id,
+                 max_messages=-1, acknowledgement_mode="auto", 
offset_reset_strategy="",
+                 version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", 
jaas_override_variables=None,
+                 on_record_consumed=None):
+        """
+        :param jaas_override_variables: A dict of variables to be used in the 
jaas.conf template file
+        """
+        super(VerifiableShareConsumer, self).__init__(context, num_nodes)
+        self.log_level = log_level
+        self.kafka = kafka
+        self.topic = topic
+        self.group_id = group_id
+        self.offset_reset_strategy = offset_reset_strategy
+        self.max_messages = max_messages
+        self.acknowledgement_mode = acknowledgement_mode
+        self.prop_file = ""
+        self.stop_timeout_sec = stop_timeout_sec
+        self.on_record_consumed = on_record_consumed
+
+        self.event_handlers = {}
+        self.jaas_override_variables = jaas_override_variables or {}
+
+        self.total_records_consumed = 0
+        self.total_records_acknowledged = 0
+        self.total_records_acknowledged_failed = 0
+        self.consumed_records_offsets = set()
+        self.acknowledged_records_offsets = set()
+        self.is_offset_reset_strategy_set = False
+
+        for node in self.nodes:
+            node.version = version
+
+    def java_class_name(self):
+        return "VerifiableShareConsumer"
+
+    def create_event_handler(self, idx, node):
+        return ShareConsumerEventHandler(node, idx)
+
+    def _worker(self, idx, node):
+        with self.lock:
+            self.event_handlers[node] = self.create_event_handler(idx, node)
+            handler = self.event_handlers[node]
+
+        node.account.ssh("mkdir -p %s" % 
VerifiableShareConsumer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render(get_log4j_config_for_tools(node), 
log_file=VerifiableShareConsumer.LOG_FILE)
+        node.account.create_file(get_log4j_config_for_tools(node), log_config)
+
+        # Create and upload config file
+        self.security_config = 
self.kafka.security_config.client_config(self.prop_file, node,
+                                                                        
self.jaas_override_variables)
+        self.security_config.setup_node(node)
+        self.prop_file += str(self.security_config)
+        self.logger.info("verifiable_share_consumer.properties:")
+        self.logger.info(self.prop_file)
+        node.account.create_file(VerifiableShareConsumer.CONFIG_FILE, 
self.prop_file)
+        self.security_config.setup_node(node)
+
+        cmd = self.start_cmd(node)
+        self.logger.debug("VerifiableShareConsumer %d command: %s" % (idx, 
cmd))
+
+        for line in node.account.ssh_capture(cmd):
+            event = self.try_parse_json(node, line.strip())
+            if event is not None:
+                with self.lock:
+                    name = event["name"]
+                    if name == "shutdown_complete":
+                        handler.handle_shutdown_complete(node, self.logger)
+                    elif name == "startup_complete":
+                        handler.handle_startup_complete(node, self.logger)
+                    elif name == "offsets_acknowledged":
+                        handler.handle_offsets_acknowledged(event, node, 
self.logger)
+                        self._update_global_acknowledged(event)
+                    elif name == "records_consumed":
+                        handler.handle_records_consumed(event, node, 
self.logger)
+                        self._update_global_consumed(event)
+                    elif name == "record_data" and self.on_record_consumed:
+                        self.on_record_consumed(event, node)
+                    elif name == "offset_reset_strategy_set":
+                        self._on_offset_reset_strategy_set()
+                    else:
+                        self.logger.debug("%s: ignoring unknown event: %s" % 
(str(node.account), event))
+
+    def _update_global_acknowledged(self, acknowledge_event):
+        if acknowledge_event["success"]:
+            self.total_records_acknowledged += acknowledge_event["count"]
+        else:
+            self.total_records_acknowledged_failed += 
acknowledge_event["count"]
+        for share_partition_data in acknowledge_event["partitions"]:
+                tpkey = str(share_partition_data["topic"]) + "-" + 
str(share_partition_data["partition"])
+                for offset in share_partition_data["offsets"]:
+                    key = tpkey + "-" + str(offset)
+                    if key not in self.acknowledged_records_offsets:
+                        self.acknowledged_records_offsets.add(key)
+
+    def _update_global_consumed(self, consumed_event):
+        self.total_records_consumed += consumed_event["count"]
+
+        for share_partition_data in consumed_event["partitions"]:
+            tpkey = str(share_partition_data["topic"]) + "-" + 
str(share_partition_data["partition"])
+            for offset in share_partition_data["offsets"]:
+                key = tpkey + "-" + str(offset)
+                if key not in self.consumed_records_offsets:
+                    self.consumed_records_offsets.add(key)
+
+    def _on_offset_reset_strategy_set(self):
+        self.is_offset_reset_strategy_set = True
+
+    def start_cmd(self, node):
+        cmd = ""
+        cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR
+        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\"; " % 
(get_log4j_config_param(node), get_log4j_config_for_tools(node))
+        cmd += self.impl.exec_cmd(node)
+        if self.on_record_consumed:
+            cmd += " --verbose"
+
+        cmd += " --acknowledgement-mode %s" % self.acknowledgement_mode
+
+        cmd += " --offset-reset-strategy %s" % self.offset_reset_strategy
+
+        cmd += " --bootstrap-server %s" % 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
+
+        cmd += " --group-id %s --topic %s" % (self.group_id, self.topic)
+
+        if self.max_messages > 0:
+            cmd += " --max-messages %s" % str(self.max_messages)
+
+        cmd += " --consumer.config %s" % VerifiableShareConsumer.CONFIG_FILE
+        cmd += " 2>> %s | tee -a %s &" % 
(VerifiableShareConsumer.STDOUT_CAPTURE, VerifiableShareConsumer.STDOUT_CAPTURE)
+        return cmd
+
+    def pids(self, node):
+        return self.impl.pids(node)
+
+    def try_parse_json(self, node, string):
+        """Try to parse a string as json. Return None if not parseable."""
+        try:
+            return json.loads(string)
+        except ValueError:
+            self.logger.debug("%s: Could not parse as json: %s" % 
(str(node.account), str(string)))
+            return None
+
+    def stop_all(self):
+        for node in self.nodes:
+            self.stop_node(node)
+
+    def kill_node(self, node, clean_shutdown=True, allow_fail=False):
+        sig = self.impl.kill_signal(clean_shutdown)
+        for pid in self.pids(node):
+            node.account.signal(pid, sig, allow_fail)
+
+        with self.lock:
+            self.event_handlers[node].handle_kill_process(clean_shutdown)
+
+    def stop_node(self, node, clean_shutdown=True):
+        self.kill_node(node, clean_shutdown=clean_shutdown)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of 
%s seconds" % \
+                        (str(node.account), str(self.stop_timeout_sec))
+
+    def clean_node(self, node):
+        self.kill_node(node, clean_shutdown=False)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        self.security_config.clean_node(node)
+
+    def total_consumed(self):
+        with self.lock:
+            return self.total_records_consumed
+        
+    def total_unique_consumed(self):
+        with self.lock:
+            return len(self.consumed_records_offsets)
+
+    def total_unique_acknowledged(self):
+        with self.lock:
+            return len(self.acknowledged_records_offsets)
+
+    def total_acknowledged(self):
+        with self.lock:
+            return self.total_records_acknowledged + 
self.total_records_acknowledged_failed
+        
+    def total_successful_acknowledged(self):
+        with self.lock:
+            return self.total_records_acknowledged
+        
+    def total_failed_acknowledged(self):
+        with self.lock:
+            return self.total_records_acknowledged_failed
+
+    def total_consumed_for_a_share_consumer(self, node):
+        with self.lock:
+            return self.event_handlers[node].total_consumed
+
+    def total_acknowledged_for_a_share_consumer(self, node):
+        with self.lock:
+            return self.event_handlers[node].total_acknowledged + 
self.event_handlers[node].total_acknowledged_failed
+        
+    def total_successful_acknowledged_for_a_share_consumer(self, node):
+        with self.lock:
+            return self.event_handlers[node].total_acknowledged
+        
+    def total_failed_acknowledged_for_a_share_consumer(self, node):
+        with self.lock:
+            return self.event_handlers[node].total_acknowledged_failed
+
+    def offset_reset_strategy_set(self):
+        with self.lock:
+            return self.is_offset_reset_strategy_set
+
+    def dead_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.values()
+                    if handler.state == ShareConsumerState.Dead]
+
+    def alive_nodes(self):
+        with self.lock:
+            return [handler.node for handler in self.event_handlers.values()
+                    if handler.state == ShareConsumerState.Started]
\ No newline at end of file
diff --git a/tests/kafkatest/tests/client/share_consumer_test.py 
b/tests/kafkatest/tests/client/share_consumer_test.py
new file mode 100644
index 00000000000..cc74234ac0f
--- /dev/null
+++ b/tests/kafkatest/tests/client/share_consumer_test.py
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from kafkatest.tests.verifiable_share_consumer_test import 
VerifiableShareConsumerTest
+
+from kafkatest.services.kafka import TopicPartition, quorum
+
+import signal
+
+class ShareConsumerTest(VerifiableShareConsumerTest):
+    TOPIC1 = {"name": "test_topic1", "partitions": 1,"replication_factor": 1}
+    TOPIC2 = {"name": "test_topic2", "partitions": 3,"replication_factor": 3}
+    TOPIC3 = {"name": "test_topic3", "partitions": 3,"replication_factor": 3}
+
+    num_consumers = 3
+    num_producers = 1
+    num_brokers = 3
+
+    def __init__(self, test_context):
+        super(ShareConsumerTest, self).__init__(test_context, 
num_consumers=self.num_consumers, num_producers=self.num_producers,
+                                        num_zk=0, 
num_brokers=self.num_brokers, topics={
+                self.TOPIC1["name"] : { 'partitions': 
self.TOPIC1["partitions"], 'replication-factor': 
self.TOPIC1["replication_factor"] },
+                self.TOPIC2["name"] : { 'partitions': 
self.TOPIC2["partitions"], 'replication-factor': 
self.TOPIC2["replication_factor"] }
+            })
+
+    def setup_share_group(self, topic, **kwargs):
+        consumer = super(ShareConsumerTest, self).setup_share_group(topic, 
**kwargs)
+        self.mark_for_collect(consumer, 'verifiable_share_consumer_stdout')
+        return consumer
+
+    def get_topic_partitions(self, topic):
+        return [TopicPartition(topic["name"], i) for i in 
range(topic["partitions"])]
+
+    def wait_until_topic_replicas_settled(self, topic, expected_num_isr, 
timeout_sec=60):
+        for partition in range(0, topic["partitions"]):
+            wait_until(lambda: len(self.kafka.isr_idx_list(topic["name"], 
partition)) == expected_num_isr,
+                       timeout_sec=timeout_sec, backoff_sec=1, err_msg="the 
expected number of ISRs did not settle in a reasonable amount of time")
+
+    def wait_until_topic_partition_leaders_settled(self, topic, 
timeout_sec=60):
+        def leader_settled(partition_leader, topicName, partition):
+            try:
+                partition_leader(topicName, partition)
+                return True
+            except Exception:
+                return False
+        for partition in range(0, topic["partitions"]):
+            wait_until(lambda: leader_settled(self.kafka.leader, 
topic["name"], partition),
+                       timeout_sec=timeout_sec, backoff_sec=1, 
err_msg="partition leaders did not settle in a reasonable amount of time")
+
+    def rolling_bounce_brokers(self, topic, num_bounces=5, 
clean_shutdown=True, timeout_sec=60):
+        for _ in range(num_bounces):
+            for i in range(len(self.kafka.nodes)):
+                node = self.kafka.nodes[i]
+                self.kafka.restart_node(node, clean_shutdown=clean_shutdown)
+                self.wait_until_topic_replicas_settled(topic, expected_num_isr 
= topic["replication_factor"], timeout_sec=timeout_sec)
+
+    def fail_brokers(self, topic, num_brokers=1, clean_shutdown=True, 
timeout_sec=60):
+        for i in range(num_brokers):
+            self.kafka.signal_node(self.kafka.nodes[i], signal.SIGTERM if 
clean_shutdown else signal.SIGKILL)
+            self.wait_until_topic_replicas_settled(topic, 
topic["replication_factor"] - (i + 1))
+        self.wait_until_topic_partition_leaders_settled(topic, 
timeout_sec=timeout_sec)
+
+    def rolling_bounce_share_consumers(self, consumer, keep_alive=0, 
num_bounces=5, clean_shutdown=True, timeout_sec=60):
+        for _ in range(num_bounces):
+            num_consumers_killed = 0
+            for node in consumer.nodes[keep_alive:]:
+                consumer.stop_node(node, clean_shutdown)
+                num_consumers_killed += 1
+                wait_until(lambda: len(consumer.dead_nodes()) == 1,
+                           timeout_sec=timeout_sec,
+                           err_msg="Timed out waiting for the share consumer 
to shutdown")
+
+                consumer.start_node(node)
+
+                self.await_all_members(consumer, timeout_sec=timeout_sec)
+                self.await_consumed_messages_by_a_consumer(consumer, node, 
timeout_sec=timeout_sec)
+
+    def bounce_all_share_consumers(self, consumer, keep_alive=0, 
num_bounces=5, clean_shutdown=True, timeout_sec=60):
+        for _ in range(num_bounces):
+            for node in consumer.nodes[keep_alive:]:
+                consumer.stop_node(node, clean_shutdown)
+
+            wait_until(lambda: len(consumer.dead_nodes()) == 
self.num_consumers - keep_alive, timeout_sec=timeout_sec,
+                       err_msg="Timed out waiting for the share consumers to 
shutdown")
+
+            num_alive_consumers = keep_alive
+            for node in consumer.nodes[keep_alive:]:
+                consumer.start_node(node)
+                num_alive_consumers += 1
+                self.await_members(consumer, 
num_consumers=num_alive_consumers, timeout_sec=timeout_sec)
+                self.await_consumed_messages_by_a_consumer(consumer, node, 
timeout_sec=timeout_sec)
+
+    def fail_share_consumers(self, consumer, num_consumers=1, 
clean_shutdown=True, timeout_sec=60):
+        for i in range(num_consumers):
+            consumer.kill_node(consumer.nodes[i], 
clean_shutdown=clean_shutdown)
+            wait_until(lambda: len(consumer.dead_nodes()) == (i + 1),
+                       timeout_sec=timeout_sec,
+                       err_msg="Timed out waiting for the share consumer to be 
killed")
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft]
+    )
+    def test_share_single_topic_partition(self, 
metadata_quorum=quorum.isolated_kraft):
+
+        total_messages = 100000
+        producer = self.setup_producer(self.TOPIC1["name"], 
max_messages=total_messages)
+
+        consumer = self.setup_share_group(self.TOPIC1["name"], 
offset_reset_strategy="earliest")
+
+        producer.start()
+
+        consumer.start()
+        self.await_all_members(consumer, timeout_sec=60)
+
+        self.await_acknowledged_messages(consumer, 
min_messages=total_messages, timeout_sec=60)
+
+        assert consumer.total_consumed() >= producer.num_acked
+        assert consumer.total_acknowledged() == producer.num_acked
+
+        for event_handler in consumer.event_handlers.values():
+            assert event_handler.total_consumed > 0
+            assert event_handler.total_acknowledged > 0
+
+        producer.stop()
+        consumer.stop_all()
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft]
+    )
+    def test_share_multiple_partitions(self, 
metadata_quorum=quorum.isolated_kraft):
+
+        total_messages = 1000000
+        producer = self.setup_producer(self.TOPIC2["name"], 
max_messages=total_messages, throughput=5000)
+
+        consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")
+
+        producer.start()
+
+        consumer.start()
+        self.await_all_members(consumer, timeout_sec=60)
+
+        self.await_acknowledged_messages(consumer, 
min_messages=total_messages, timeout_sec=60)
+
+        assert consumer.total_consumed() >= producer.num_acked
+        assert consumer.total_acknowledged() == producer.num_acked
+
+        for event_handler in consumer.event_handlers.values():
+            assert event_handler.total_consumed > 0
+            assert event_handler.total_acknowledged > 0
+            for topic_partition in self.get_topic_partitions(self.TOPIC2):
+                assert topic_partition in event_handler.consumed_per_partition
+                assert event_handler.consumed_per_partition[topic_partition] > 0
+                assert topic_partition in 
event_handler.acknowledged_per_partition
+                assert 
event_handler.acknowledged_per_partition[topic_partition] > 0
+
+        producer.stop()
+        consumer.stop_all()
+
+    @cluster(num_nodes=10)
+    @matrix(
+        clean_shutdown=[True, False],
+        metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft]
+    )
+    def test_broker_rolling_bounce(self, clean_shutdown, 
metadata_quorum=quorum.isolated_kraft):
+
+        producer = self.setup_producer(self.TOPIC2["name"])
+        consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        self.await_consumed_messages(consumer, timeout_sec=60)
+        self.rolling_bounce_brokers(self.TOPIC2, num_bounces=1, 
clean_shutdown=clean_shutdown)
+
+        # ensure that the share consumers do some work after the broker bounces
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        producer.stop()
+
+        self.await_unique_consumed_messages(consumer, 
min_messages=producer.num_acked, timeout_sec=60)
+
+        assert consumer.total_unique_consumed() >= producer.num_acked
+
+        consumer.stop_all()
+
+    @cluster(num_nodes=10)
+    @matrix(
+        clean_shutdown=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        num_failed_brokers=[1, 2]
+    )
+    @matrix(
+        clean_shutdown=[True, False],
+        metadata_quorum=[quorum.combined_kraft],
+        num_failed_brokers=[1]
+    )
+    def test_broker_failure(self, clean_shutdown, 
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1):
+
+        producer = self.setup_producer(self.TOPIC2["name"])
+        consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        # shutdown the required number of brokers
+        self.fail_brokers(self.TOPIC2, num_brokers=num_failed_brokers, 
clean_shutdown=clean_shutdown)
+
+        # ensure that the share consumers do some work after the broker failure
+        self.await_consumed_messages(consumer, min_messages=1000)
+
+        producer.stop()
+
+        self.await_unique_consumed_messages(consumer, 
min_messages=producer.num_acked, timeout_sec=60)
+
+        assert consumer.total_unique_consumed() >= producer.num_acked
+
+        consumer.stop_all()
+
+    @cluster(num_nodes=10)
+    @matrix(
+        clean_shutdown=[True, False],
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft]
+    )
+    def test_share_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk):
+        """
+        Verify correct share consumer behavior when the share consumers in the 
group are consecutively restarted.
+
+        Setup: single Kafka cluster with one producer and a set of share 
consumers in one group.
+
+        - Start a producer which continues producing new messages throughout 
the test.
+        - Start up the share consumers and wait until they've joined the group.
+        - In a loop, restart each share consumer, waiting for each one to 
rejoin the group before
+          restarting the rest.
+        - Verify that the share consumers consume all messages produced by the 
producer atleast once.
+        """
+
+        producer = self.setup_producer(self.TOPIC2["name"])
+        consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")
+
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        if bounce_mode == "all":
+            self.bounce_all_share_consumers(consumer, 
clean_shutdown=clean_shutdown)
+        else:
+            self.rolling_bounce_share_consumers(consumer, 
clean_shutdown=clean_shutdown)
+
+        producer.stop()
+
+        self.await_unique_consumed_messages(consumer, 
min_messages=producer.num_acked, timeout_sec=60)
+
+        assert consumer.total_unique_consumed() >= producer.num_acked
+
+        consumer.stop_all()
+
+    @cluster(num_nodes=10)
+    @matrix(
+        clean_shutdown=[True, False],
+        num_failed_consumers=[1, 2],
+        metadata_quorum=[quorum.isolated_kraft, quorum.combined_kraft]
+    )
+    def test_share_consumer_failure(self, clean_shutdown, 
metadata_quorum=quorum.zk, num_failed_consumers=1):
+
+        producer = self.setup_producer(self.TOPIC2["name"])
+        consumer = self.setup_share_group(self.TOPIC2["name"], 
offset_reset_strategy="earliest")
+
+        # startup the producer and ensure that some records have been written
+        producer.start()
+        self.await_produced_messages(producer)
+
+        consumer.start()
+        self.await_all_members(consumer)
+
+        # stop the required number of share consumers
+        self.fail_share_consumers(consumer, num_failed_consumers, 
clean_shutdown=clean_shutdown)
+
+        # ensure that the remaining consumer does some work
+        self.await_consumed_messages(consumer, min_messages=1000, 
timeout_sec=60)
+
+        producer.stop()
+
+        self.await_unique_consumed_messages(consumer, 
min_messages=producer.num_acked, timeout_sec=60)
+
+        assert consumer.total_unique_consumed() >= producer.num_acked
+
+        consumer.stop_all()
\ No newline at end of file
diff --git a/tests/kafkatest/tests/verifiable_share_consumer_test.py 
b/tests/kafkatest/tests/verifiable_share_consumer_test.py
new file mode 100644
index 00000000000..263a40e62fb
--- /dev/null
+++ b/tests/kafkatest/tests/verifiable_share_consumer_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.verifiable_share_consumer import 
VerifiableShareConsumer
+from kafkatest.services.kafka import TopicPartition
+
+class VerifiableShareConsumerTest(KafkaTest):
+    PRODUCER_REQUEST_TIMEOUT_SEC = 30
+
+    def __init__(self, test_context, num_consumers=1, num_producers=0, 
**kwargs):
+        super(VerifiableShareConsumerTest, self).__init__(test_context, 
**kwargs)
+        self.num_consumers = num_consumers
+        self.num_producers = num_producers
+
+    def _all_partitions(self, topic, num_partitions):
+        partitions = set()
+        for i in range(num_partitions):
+            partitions.add(TopicPartition(topic=topic, partition=i))
+        return partitions
+
+    def _partitions(self, assignment):
+        partitions = []
+        for parts in assignment.values():
+            partitions += parts
+        return partitions
+
+    def valid_assignment(self, topic, num_partitions, assignment):
+        all_partitions = self._all_partitions(topic, num_partitions)
+        partitions = self._partitions(assignment)
+        return len(partitions) == num_partitions and set(partitions) == 
all_partitions
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the 
constructor"""
+        return super(VerifiableShareConsumerTest, self).min_cluster_size() + 
self.num_consumers + self.num_producers
+
+    def setup_share_group(self, topic, acknowledgement_mode="auto", 
group_id="test_group_id", offset_reset_strategy="", **kwargs):
+        return VerifiableShareConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                  topic, group_id, 
acknowledgement_mode=acknowledgement_mode,
+                                  offset_reset_strategy=offset_reset_strategy, 
log_level="TRACE", **kwargs)
+
+    def setup_producer(self, topic, max_messages=-1, throughput=500):
+        return VerifiableProducer(self.test_context, self.num_producers, 
self.kafka, topic,
+                                  max_messages=max_messages, 
throughput=throughput,
+                                  
request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC,
+                                  log_level="DEBUG")
+
+    def await_produced_messages(self, producer, min_messages=1000, 
timeout_sec=10):
+        current_acked = producer.num_acked
+        wait_until(lambda: producer.num_acked >= current_acked + min_messages, 
timeout_sec=timeout_sec,
+                   err_msg="Timeout awaiting messages to be produced and 
acked")
+
+    def await_consumed_messages(self, consumer, min_messages=1, 
timeout_sec=10, total=False):
+        current_total = 0
+        if total is False:
+            current_total = consumer.total_consumed()
+        wait_until(lambda: consumer.total_consumed() >= current_total + 
min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+
+    def await_consumed_messages_by_a_consumer(self, consumer, node, 
min_messages=1, timeout_sec=10, total=False):
+        current_total = 0
+        if total is False:
+            current_total = consumer.total_consumed_for_a_share_consumer(node)
+        wait_until(lambda: consumer.total_consumed_for_a_share_consumer(node) 
>= current_total + min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+        
+    def await_unique_consumed_messages(self, consumer, min_messages=1, 
timeout_sec=10):
+        wait_until(lambda: consumer.total_unique_consumed() >= min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+
+    def await_acknowledged_messages(self, consumer, min_messages=1, 
timeout_sec=10):
+        wait_until(lambda: consumer.total_acknowledged() >=  min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+        
+    def await_unique_acknowledged_messages(self, consumer, min_messages=1, 
timeout_sec=10):
+        wait_until(lambda: consumer.total_unique_acknowledged() >=  
min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+
+    def await_members(self, consumer, num_consumers, timeout_sec=10):
+        # Wait until all members have started
+        wait_until(lambda: len(consumer.alive_nodes()) == num_consumers,
+                   timeout_sec=timeout_sec,
+                   err_msg="Consumers failed to start in a reasonable amount 
of time")
+
+    def await_all_members(self, consumer, timeout_sec=10):
+        self.await_members(consumer, self.num_consumers, 
timeout_sec=timeout_sec)
\ No newline at end of file
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
new file mode 100644
index 00000000000..654a2960204
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommitCallback {
+
+    private static final Logger log = 
LoggerFactory.getLogger(VerifiableShareConsumer.class);
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final PrintStream out;
+    private final KafkaShareConsumer<String, String> consumer;
+    private final String topic;
+    private final AcknowledgementMode acknowledgementMode;
+    private final String offsetResetStrategy;
+    private final Boolean verbose;
+    private final int maxMessages;
+    private Integer totalAcknowledged = 0;
+    private final String brokerHostandPort;
+    private final String groupId;
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    public static class PartitionData {
+        private final String topic;
+        private final int partition;
+
+        public PartitionData(String topic, int partition) {
+            this.topic = topic;
+            this.partition = partition;
+        }
+
+        @JsonProperty
+        public String topic() {
+            return topic;
+        }
+
+        @JsonProperty
+        public int partition() {
+            return partition;
+        }
+    }
+
+    public static class RecordSetSummary extends PartitionData {
+        private final long count;
+        private final Set<Long> offsets;
+
+        public RecordSetSummary(String topic, int partition, Set<Long> 
offsets) {
+            super(topic, partition);
+            this.offsets = offsets;
+            this.count = offsets.size();
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public Set<Long> offsets() {
+            return offsets;
+        }
+
+    }
+
+    protected static class AcknowledgedData extends PartitionData {
+        private final long count;
+        private final Set<Long> offsets;
+
+        public AcknowledgedData(String topic, int partition, Set<Long> 
offsets) {
+            super(topic, partition);
+            this.offsets = offsets;
+            this.count = offsets.size();
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public Set<Long> offsets() {
+            return offsets;
+        }
+    }
+
+    @JsonPropertyOrder({ "timestamp", "name" })
+    private abstract static class ShareConsumerEvent {
+        private final long timestamp = System.currentTimeMillis();
+
+        @JsonProperty
+        public abstract String name();
+
+        @JsonProperty
+        public long timestamp() {
+            return timestamp;
+        }
+    }
+
+    protected static class StartupComplete extends ShareConsumerEvent {
+
+        @Override
+        public String name() {
+            return "startup_complete";
+        }
+    }
+
+    @JsonPropertyOrder({ "timestamp", "name", "offsetResetStrategy" })
+    protected static class OffsetResetStrategySet extends ShareConsumerEvent {
+
+        private final String offsetResetStrategy;
+
+        public OffsetResetStrategySet(String offsetResetStrategy) {
+            this.offsetResetStrategy = offsetResetStrategy;
+        }
+
+        @Override
+        public String name() {
+            return "offset_reset_strategy_set";
+        }
+
+        @JsonProperty
+        public String offsetResetStrategy() {
+            return offsetResetStrategy;
+        }
+    }
+
+    protected static class ShutdownComplete extends ShareConsumerEvent {
+
+        @Override
+        public String name() {
+            return "shutdown_complete";
+        }
+    }
+
+    @JsonPropertyOrder({ "timestamp", "name", "count", "partitions" })
+    public static class RecordsConsumed extends ShareConsumerEvent {
+        private final long count;
+        private final List<RecordSetSummary> partitionSummaries;
+
+        public RecordsConsumed(long count, List<RecordSetSummary> 
partitionSummaries) {
+            this.count = count;
+            this.partitionSummaries = partitionSummaries;
+        }
+
+        @Override
+        public String name() {
+            return "records_consumed";
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public List<RecordSetSummary> partitions() {
+            return partitionSummaries;
+        }
+    }
+
+    @JsonPropertyOrder({ "timestamp", "name", "count", "partitions", 
"success", "error" })
+    protected static class OffsetsAcknowledged extends ShareConsumerEvent {
+
+        private final long count;
+        private final List<AcknowledgedData> partitions;
+        private final String error;
+        private final boolean success;
+
+        public OffsetsAcknowledged(long count, List<AcknowledgedData> 
partitions, String error, boolean success) {
+            this.count = count;
+            this.partitions = partitions;
+            this.error = error;
+            this.success = success;
+        }
+
+        @Override
+        public String name() {
+            return "offsets_acknowledged";
+        }
+
+        @JsonProperty
+        public long count() {
+            return count;
+        }
+
+        @JsonProperty
+        public List<AcknowledgedData> partitions() {
+            return partitions;
+        }
+
+        @JsonProperty
+        @JsonInclude(JsonInclude.Include.NON_NULL)
+        public String error() {
+            return error;
+        }
+
+        @JsonProperty
+        public boolean success() {
+            return success;
+        }
+
+    }
+
+    @JsonPropertyOrder({ "timestamp", "name", "key", "value", "topic", 
"partition", "offset" })
+    public static class RecordData extends ShareConsumerEvent {
+
+        private final ConsumerRecord<String, String> record;
+
+        public RecordData(ConsumerRecord<String, String> record) {
+            this.record = record;
+        }
+
+        @Override
+        public String name() {
+            return "record_data";
+        }
+
+        @JsonProperty
+        public String topic() {
+            return record.topic();
+        }
+
+        @JsonProperty
+        public int partition() {
+            return record.partition();
+        }
+
+        @JsonProperty
+        public String key() {
+            return record.key();
+        }
+
+        @JsonProperty
+        public String value() {
+            return record.value();
+        }
+
+        @JsonProperty
+        public long offset() {
+            return record.offset();
+        }
+
+    }
+
+    public VerifiableShareConsumer(KafkaShareConsumer<String, String> consumer,
+                                   PrintStream out,
+                                   Integer maxMessages,
+                                   String topic,
+                                   AcknowledgementMode acknowledgementMode,
+                                   String offsetResetStrategy,
+                                   String brokerHostandPort,
+                                   String groupId,
+                                   Boolean verbose) {
+        this.out = out;
+        this.consumer = consumer;
+        this.topic = topic;
+        this.acknowledgementMode = acknowledgementMode;
+        this.offsetResetStrategy = offsetResetStrategy;
+        this.verbose = verbose;
+        this.maxMessages = maxMessages;
+        this.brokerHostandPort = brokerHostandPort;
+        this.groupId = groupId;
+        addKafkaSerializerModule();
+    }
+
+    private void addKafkaSerializerModule() {
+        SimpleModule kafka = new SimpleModule();
+        kafka.addSerializer(TopicPartition.class, new JsonSerializer<>() {
+            @Override
+            public void serialize(TopicPartition tp, JsonGenerator gen, 
SerializerProvider serializers) throws IOException {
+                gen.writeStartObject();
+                gen.writeObjectField("topic", tp.topic());
+                gen.writeObjectField("partition", tp.partition());
+                gen.writeEndObject();
+            }
+        });
+        mapper.registerModule(kafka);
+    }
+
+    private void onRecordsReceived(ConsumerRecords<String, String> records) {
+        List<RecordSetSummary> summaries = new ArrayList<>();
+        for (TopicPartition tp : records.partitions()) {
+            List<ConsumerRecord<String, String>> partitionRecords = 
records.records(tp);
+
+            if (partitionRecords.isEmpty())
+                continue;
+
+            TreeSet<Long> partitionOffsets = new TreeSet<>();
+
+            for (ConsumerRecord<String, String> record : partitionRecords) {
+                partitionOffsets.add(record.offset());
+            }
+
+            summaries.add(new RecordSetSummary(tp.topic(), tp.partition(), 
partitionOffsets));
+
+            if (this.verbose) {
+                for (ConsumerRecord<String, String> record : partitionRecords) 
{
+                    printJson(new RecordData(record));
+                }
+            }
+        }
+
+        printJson(new RecordsConsumed(records.count(), summaries));
+    }
+
+    @Override
+    public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
+        List<AcknowledgedData> acknowledgedOffsets = new ArrayList<>();
+        int totalAcknowledged = 0;
+        for (Map.Entry<TopicIdPartition, Set<Long>> offsetEntry : 
offsetsMap.entrySet()) {
+            TopicIdPartition tp = offsetEntry.getKey();
+            acknowledgedOffsets.add(new AcknowledgedData(tp.topic(), 
tp.partition(), offsetEntry.getValue()));
+            totalAcknowledged += offsetEntry.getValue().size();
+        }
+
+        boolean success = true;
+        String error = null;
+        if (exception != null) {
+            success = false;
+            error = exception.getMessage();
+        }
+        printJson(new OffsetsAcknowledged(totalAcknowledged, 
acknowledgedOffsets, error, success));
+        if (success) {
+            this.totalAcknowledged += totalAcknowledged;
+        }
+    }
+
+    public void run() {
+        try {
+            printJson(new StartupComplete());
+
+            if (!Objects.equals(offsetResetStrategy, "")) {
+                ShareGroupAutoOffsetResetStrategy offsetResetStrategy =
+                    
ShareGroupAutoOffsetResetStrategy.fromString(this.offsetResetStrategy);
+
+                Properties adminClientProps = new Properties();
+                
adminClientProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostandPort);
+
+                Admin adminClient = Admin.create(adminClientProps);
+
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+                Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = 
new HashMap<>();
+                alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+                    GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, 
offsetResetStrategy.type().toString()), AlterConfigOp.OpType.SET)));
+                AlterConfigsOptions alterOptions = new AlterConfigsOptions();
+
+                // Setting the share group auto offset reset strategy
+                adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+                    .all()
+                    .get(60, TimeUnit.SECONDS);
+
+                printJson(new 
OffsetResetStrategySet(offsetResetStrategy.type().toString()));
+            }
+
+            consumer.subscribe(Collections.singleton(this.topic));
+            consumer.setAcknowledgementCommitCallback(this);
+            while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(5000));
+                if (!records.isEmpty()) {
+                    onRecordsReceived(records);
+
+                    if (acknowledgementMode == AcknowledgementMode.ASYNC) {
+                        consumer.commitAsync();
+                    } else if (acknowledgementMode == 
AcknowledgementMode.SYNC) {
+                        Map<TopicIdPartition, Optional<KafkaException>> result 
= consumer.commitSync();
+                        for (Map.Entry<TopicIdPartition, 
Optional<KafkaException>> resultEntry : result.entrySet()) {
+                            if (resultEntry.getValue().isPresent()) {
+                                log.error("Failed to commit offset 
synchronously for topic partition: {}", resultEntry.getKey());
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (WakeupException e) {
+            // ignore, we are closing
+            log.trace("Caught WakeupException because share consumer is 
shutdown, ignore and terminate.", e);
+        } catch (Throwable t) {
+            // Log the error, so it goes to the service log and not stdout
+            log.error("Error during processing, terminating share consumer 
process: ", t);
+        } finally {
+            consumer.close();
+            printJson(new ShutdownComplete());
+            shutdownLatch.countDown();
+        }
+    }
+
+    public void close() {
+        boolean interrupted = false;
+        try {
+            consumer.wakeup();
+            while (true) {
+                try {
+                    shutdownLatch.await();
+                    return;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+        }
+    }
+
+    protected synchronized void printJson(Object data) {
+        try {
+            out.println(mapper.writeValueAsString(data));
+        } catch (JsonProcessingException e) {
+            out.println("Bad data can't be written as json: " + 
e.getMessage());
+        }
+    }
+
+    public enum AcknowledgementMode {
+        AUTO, ASYNC, SYNC;
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("verifiable-share-consumer")
+            .defaultHelp(true)
+            .description("This tool creates a share group and consumes 
messages from a specific topic and emits share consumer events (e.g. share 
consumer startup, received messages, and offsets acknowledged) as JSON objects 
to STDOUT.");
+        MutuallyExclusiveGroup connectionGroup = 
parser.addMutuallyExclusiveGroup("Connection Group")
+            .description("Group of arguments for connection to brokers")
+            .required(true);
+        connectionGroup.addArgument("--bootstrap-server")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+            .dest("bootstrapServer")
+            .help("The server(s) to connect to. Comma-separated list of Kafka 
brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--topic")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("TOPIC")
+            .help("Consumes messages from this topic.");
+
+        parser.addArgument("--group-id")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("GROUP_ID")
+            .dest("groupId")
+            .help("The groupId shared among members of the share group");
+
+        parser.addArgument("--max-messages")
+            .action(store())
+            .required(false)
+            .type(Integer.class)
+            .setDefault(-1)
+            .metavar("MAX-MESSAGES")
+            .dest("maxMessages")
+            .help("Consume this many messages. If -1 (the default), the share 
consumers will consume until the process is killed externally");
+
+        parser.addArgument("--verbose")
+            .action(storeTrue())
+            .type(Boolean.class)
+            .metavar("VERBOSE")
+            .help("Enable to log individual consumed records");
+
+        parser.addArgument("--acknowledgement-mode")
+            .action(store())
+            .required(false)
+            .setDefault("auto")
+            .type(String.class)
+            .dest("acknowledgementMode")
+            .help("Acknowledgement mode for the share consumers (must be 
either 'auto', 'sync' or 'async')");
+
+        parser.addArgument("--offset-reset-strategy")
+            .action(store())
+            .required(false)
+            .setDefault("")
+            .type(String.class)
+            .dest("offsetResetStrategy")
+            .help("Set share group reset strategy (must be either 'earliest' 
or 'latest')");
+
+        parser.addArgument("--consumer.config")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("CONFIG_FILE")
+            .help("Consumer config properties file (config options shared with 
command line parameters will be overridden).");
+
+        return parser;
+    }
+
+    public static VerifiableShareConsumer createFromArgs(ArgumentParser 
parser, String[] args) throws ArgumentParserException {
+        Namespace res = parser.parseArgs(args);
+
+        AcknowledgementMode acknowledgementMode =
+            
AcknowledgementMode.valueOf(res.getString("acknowledgementMode").toUpperCase(Locale.ROOT));
+        String offsetResetStrategy = 
res.getString("offsetResetStrategy").toLowerCase(Locale.ROOT);
+        String configFile = res.getString("consumer.config");
+        String brokerHostandPort = res.getString("bootstrapServer");
+
+        Properties consumerProps = new Properties();
+        if (configFile != null) {
+            try {
+                consumerProps.putAll(Utils.loadProps(configFile));
+            } catch (IOException e) {
+                throw new ArgumentParserException(e.getMessage(), parser);
+            }
+        }
+
+        String groupId = res.getString("groupId");
+
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostandPort);
+
+        String topic = res.getString("topic");
+        int maxMessages = res.getInt("maxMessages");
+        boolean verbose = res.getBoolean("verbose");
+
+        StringDeserializer deserializer = new StringDeserializer();
+        KafkaShareConsumer<String, String> consumer = new 
KafkaShareConsumer<>(consumerProps, deserializer, deserializer);
+
+        return new VerifiableShareConsumer(
+            consumer,
+            System.out,
+            maxMessages,
+            topic,
+            acknowledgementMode,
+            offsetResetStrategy,
+            brokerHostandPort,
+            groupId,
+            verbose);
+    }
+
+    public static void main(String[] args) {
+        ArgumentParser parser = argParser();
+        if (args.length == 0) {
+            parser.printHelp();
+            // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
+            System.exit(0);
+        }
+        try {
+            final VerifiableShareConsumer shareConsumer = 
createFromArgs(parser, args);
+            // Can't use `Exit.addShutdownHook` here because it didn't exist 
until 2.5.0.
+            Runtime.getRuntime().addShutdownHook(new 
Thread(shareConsumer::close, "verifiable-share-consumer-shutdown-hook"));
+            shareConsumer.run();
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
+            System.exit(1);
+        }
+    }
+
+}

Reply via email to