Repository: kafka Updated Branches: refs/heads/trunk 0d68eb73f -> b16817a54
KAFKA-2812: improve consumer integration tests Author: Jason Gustafson <[email protected]> Reviewers: Geoff Anderson Closes #500 from hachikuji/KAFKA-2812 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b16817a5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b16817a5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b16817a5 Branch: refs/heads/trunk Commit: b16817a54c592eefc5a462132f45c5b4f786d5f1 Parents: 0d68eb7 Author: Jason Gustafson <[email protected]> Authored: Fri Nov 20 15:46:42 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Nov 20 15:46:42 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/verifiable_consumer.py | 249 +++++++++++++------ tests/kafkatest/tests/consumer_test.py | 224 ++++++++++++++--- 2 files changed, 370 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/services/verifiable_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 7d76166..eec46d7 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -20,12 +20,96 @@ from kafkatest.services.kafka.version import TRUNK from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.kafka import TopicPartition -from collections import namedtuple import json import os +import signal import subprocess import time -import signal + +class ConsumerState: + Dead = 1 + Rebalancing = 3 + Joined = 2 + +class ConsumerEventHandler(object): + + def __init__(self, node): + self.node = node + self.state = ConsumerState.Dead + self.revoked_count = 0 + self.assigned_count = 0 + self.assignment = [] + self.position = {} + self.committed = {} + self.total_consumed = 0 + + def handle_shutdown_complete(self): + self.state = ConsumerState.Dead + self.assignment = [] + self.position = {} + + def handle_offsets_committed(self, event): + if event["success"]: + for offset_commit in event["offsets"]: + topic = offset_commit["topic"] + partition = offset_commit["partition"] + tp = TopicPartition(topic, partition) + offset = offset_commit["offset"] + assert tp in self.assignment, "Committed offsets for a partition not assigned" + assert self.position[tp] <= offset, "The committed offset was greater than the current position" + self.committed[tp] = offset + + def handle_records_consumed(self, event): + assert self.state == ConsumerState.Joined, "Consumed records should only be received when joined" + + for record_batch in event["partitions"]: + tp = TopicPartition(topic=record_batch["topic"], + partition=record_batch["partition"]) + min_offset = record_batch["minOffset"] + max_offset = record_batch["maxOffset"] + + assert tp in self.assignment, "Consumed records for a partition not assigned" + assert tp not in self.position or self.position[tp] == min_offset, \ + "Consumed from an unexpected offset (%s, %s)" % (str(self.position[tp]), str(min_offset)) + self.position[tp] = max_offset + 1 + + self.total_consumed += event["count"] + + def handle_partitions_revoked(self, event): + self.revoked_count += 1 + self.state = ConsumerState.Rebalancing + self.position = {} + + def handle_partitions_assigned(self, event): + self.assigned_count += 1 + self.state = ConsumerState.Joined + assignment = [] + for topic_partition in event["partitions"]: + topic = topic_partition["topic"] + partition = topic_partition["partition"] + assignment.append(TopicPartition(topic, partition)) + self.assignment = assignment + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the consumer + if not clean_shutdown: + self.handle_shutdown_complete() + + def current_assignment(self): + return list(self.assignment) + + def current_position(self, tp): + if tp in self.position: + return self.position[tp] + else: + return None + + def last_commit(self, tp): + if tp in self.committed: + return self.committed[tp] + else: + return None class VerifiableConsumer(BackgroundThreadService): PERSISTENT_ROOT = "/mnt/verifiable_consumer" @@ -49,7 +133,8 @@ class VerifiableConsumer(BackgroundThreadService): } def __init__(self, context, num_nodes, kafka, topic, group_id, - max_messages=-1, session_timeout=30000, version=TRUNK): + max_messages=-1, session_timeout=30000, enable_autocommit=False, + version=TRUNK): super(VerifiableConsumer, self).__init__(context, num_nodes) self.log_level = "TRACE" @@ -58,23 +143,23 @@ class VerifiableConsumer(BackgroundThreadService): self.group_id = group_id self.max_messages = max_messages self.session_timeout = session_timeout + self.enable_autocommit = enable_autocommit + self.prop_file = "" + self.security_config = kafka.security_config.client_config(self.prop_file) + self.prop_file += str(self.security_config) - self.assignment = {} - self.joined = set() - self.total_records = 0 - self.consumed_positions = {} - self.committed_offsets = {} - self.revoked_count = 0 - self.assigned_count = 0 + self.event_handlers = {} + self.global_position = {} + self.global_committed = {} for node in self.nodes: node.version = version - self.prop_file = "" - self.security_config = kafka.security_config.client_config(self.prop_file) - self.prop_file += str(self.security_config) - def _worker(self, idx, node): + if node not in self.event_handlers: + self.event_handlers[node] = ConsumerEventHandler(node) + + handler = self.event_handlers[node] node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False) # Create and upload log properties @@ -86,7 +171,6 @@ class VerifiableConsumer(BackgroundThreadService): self.logger.info(self.prop_file) node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file) self.security_config.setup_node(node) - cmd = self.start_cmd(node) self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd)) @@ -96,51 +180,42 @@ class VerifiableConsumer(BackgroundThreadService): with self.lock: name = event["name"] if name == "shutdown_complete": - self._handle_shutdown_complete(node) + handler.handle_shutdown_complete() if name == "offsets_committed": - self._handle_offsets_committed(node, event) + handler.handle_offsets_committed(event) + self._update_global_committed(event) elif name == "records_consumed": - self._handle_records_consumed(node, event) + handler.handle_records_consumed(event) + self._update_global_position(event) elif name == "partitions_revoked": - self._handle_partitions_revoked(node, event) + handler.handle_partitions_revoked(event) elif name == "partitions_assigned": - self._handle_partitions_assigned(node, event) - - def _handle_shutdown_complete(self, node): - if node in self.joined: - self.joined.remove(node) - - def _handle_offsets_committed(self, node, event): - if event["success"]: - for offset_commit in event["offsets"]: - topic = offset_commit["topic"] - partition = offset_commit["partition"] - tp = TopicPartition(topic, partition) - self.committed_offsets[tp] = offset_commit["offset"] - - def _handle_records_consumed(self, node, event): - for topic_partition in event["partitions"]: - topic = topic_partition["topic"] - partition = topic_partition["partition"] - tp = TopicPartition(topic, partition) - self.consumed_positions[tp] = topic_partition["maxOffset"] + 1 - self.total_records += event["count"] - - def _handle_partitions_revoked(self, node, event): - self.revoked_count += 1 - self.assignment[node] = [] - if node in self.joined: - self.joined.remove(node) - - def _handle_partitions_assigned(self, node, event): - self.assigned_count += 1 - self.joined.add(node) - assignment =[] - for topic_partition in event["partitions"]: - topic = topic_partition["topic"] - partition = topic_partition["partition"] - assignment.append(TopicPartition(topic, partition)) - self.assignment[node] = assignment + handler.handle_partitions_assigned(event) + + def _update_global_position(self, consumed_event): + for consumed_partition in consumed_event["partitions"]: + tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"]) + if tp in self.global_committed: + # verify that the position never gets behind the current commit. + assert self.global_committed[tp] <= consumed_partition["minOffset"], \ + "Consumed position %d is behind the current committed offset %d" % (consumed_partition["minOffset"], self.global_committed[tp]) + + # the consumer cannot generally guarantee that the position increases monotonically + # without gaps in the face of hard failures, so we only log a warning when this happens + if tp in self.global_position and self.global_position[tp] != consumed_partition["minOffset"]: + self.logger.warn("Expected next consumed offset of %d, but instead saw %d" % + (self.global_position[tp], consumed_partition["minOffset"])) + + self.global_position[tp] = consumed_partition["maxOffset"] + 1 + + def _update_global_committed(self, commit_event): + if commit_event["success"]: + for offset_commit in commit_event["offsets"]: + tp = TopicPartition(offset_commit["topic"], offset_commit["partition"]) + offset = offset_commit["offset"] + assert self.global_position[tp] >= offset, \ + "committed offset is ahead of the current partition" + self.global_committed[tp] = offset def start_cmd(self, node): cmd = "" @@ -148,14 +223,14 @@ class VerifiableConsumer(BackgroundThreadService): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \ - " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \ - (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout) + " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \ + (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout, + "--enable-autocommit" if self.enable_autocommit else "") if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE) - print(cmd) return cmd def pids(self, node): @@ -174,6 +249,10 @@ class VerifiableConsumer(BackgroundThreadService): self.logger.debug("Could not parse as json: %s" % str(string)) return None + def stop_all(self): + for node in self.nodes: + self.stop_node(node) + def kill_node(self, node, clean_shutdown=True, allow_fail=False): if clean_shutdown: sig = signal.SIGTERM @@ -182,11 +261,10 @@ class VerifiableConsumer(BackgroundThreadService): for pid in self.pids(node): node.account.signal(pid, sig, allow_fail) - if not clean_shutdown: - self._handle_shutdown_complete(node) + self.event_handlers[node].handle_kill_process(clean_shutdown) - def stop_node(self, node, clean_shutdown=True, allow_fail=False): - self.kill_node(node, clean_shutdown, allow_fail) + def stop_node(self, node, clean_shutdown=True): + self.kill_node(node, clean_shutdown=clean_shutdown) if self.worker_threads is None: return @@ -203,20 +281,47 @@ class VerifiableConsumer(BackgroundThreadService): def current_assignment(self): with self.lock: - return self.assignment + return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() } - def position(self, tp): + def current_position(self, tp): with self.lock: - return self.consumed_positions[tp] + if tp in self.global_position: + return self.global_position[tp] + else: + return None def owner(self, tp): + for handler in self.event_handlers.itervalues(): + if tp in handler.current_assignment(): + return handler.node + return None + + def last_commit(self, tp): with self.lock: - for node, assignment in self.assignment.iteritems(): - if tp in assignment: - return node - return None + if tp in self.global_committed: + return self.global_committed[tp] + else: + return None - def committed(self, tp): + def total_consumed(self): with self.lock: - return self.committed_offsets[tp] + return sum(handler.total_consumed for handler in self.event_handlers.itervalues()) + def num_rebalances(self): + with self.lock: + return max(handler.assigned_count for handler in self.event_handlers.itervalues()) + + def joined_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.itervalues() + if handler.state == ConsumerState.Joined] + + def rebalancing_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.itervalues() + if handler.state == ConsumerState.Rebalancing] + + def dead_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.itervalues() + if handler.state == ConsumerState.Dead] http://git-wip-us.apache.org/repos/asf/kafka/blob/b16817a5/tests/kafkatest/tests/consumer_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py index 707ad2f..1b80470 100644 --- a/tests/kafkatest/tests/consumer_test.py +++ b/tests/kafkatest/tests/consumer_test.py @@ -23,15 +23,15 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.kafka import TopicPartition +import signal + def partitions_for(topic, num_partitions): partitions = set() for i in range(num_partitions): partitions.add(TopicPartition(topic=topic, partition=i)) return partitions - class VerifiableConsumerTest(KafkaTest): - STOPIC = "simple_topic" TOPIC = "test_topic" NUM_PARTITIONS = 3 @@ -61,23 +61,160 @@ class VerifiableConsumerTest(KafkaTest): partitions = self._partitions(assignment) return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS - def _setup_consumer(self, topic): + def _setup_consumer(self, topic, enable_autocommit=False): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, - topic, self.GROUP_ID, session_timeout=self.session_timeout) + topic, self.GROUP_ID, session_timeout=self.session_timeout, + enable_autocommit=enable_autocommit) def _setup_producer(self, topic, max_messages=-1): - return VerifiableProducer(self.test_context, self.num_producers, - self.kafka, topic, max_messages=max_messages) + return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic, + max_messages=max_messages, throughput=500) def _await_all_members(self, consumer): # Wait until all members have joined the group - wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20, + wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5, err_msg="Consumers failed to join in a reasonable amount of time") - def test_consumer_failure(self): + def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True): + for _ in range(num_bounces): + for node in consumer.nodes: + consumer.stop_node(node, clean_shutdown) + + wait_until(lambda: len(consumer.dead_nodes()) == (self.num_consumers - 1), timeout_sec=self.session_timeout, + err_msg="Timed out waiting for the consumers to shutdown") + + total_consumed = consumer.total_consumed() + + consumer.start_node(node) + + wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed, + timeout_sec=self.session_timeout, + err_msg="Timed out waiting for the consumers to shutdown") + + def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True): + for _ in range(num_bounces): + for node in consumer.nodes: + consumer.stop_node(node, clean_shutdown) + + wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10, + err_msg="Timed out waiting for the consumers to shutdown") + + total_consumed = consumer.total_consumed() + + for node in consumer.nodes: + consumer.start_node(node) + + wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed, + timeout_sec=self.session_timeout*2, + err_msg="Timed out waiting for the consumers to shutdown") + + def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True): + for _ in range(num_bounces): + for node in self.kafka.nodes: + total_consumed = consumer.total_consumed() + + self.kafka.restart_node(node, clean_shutdown=True) + + wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed, + timeout_sec=30, + err_msg="Timed out waiting for the broker to shutdown") + + def bounce_all_brokers(self, consumer, num_bounces=5, clean_shutdown=True): + for _ in range(num_bounces): + for node in self.kafka.nodes: + self.kafka.stop_node(node) + + for node in self.kafka.nodes: + self.kafka.start_node(node) + + + def test_broker_rolling_bounce(self): + """ + Verify correct consumer behavior when the brokers are consecutively restarted. + + Setup: single Kafka cluster with one producer writing messages to a single topic with one + partition, an a set of consumers in the same group reading from the same topic. + + - Start a producer which continues producing new messages throughout the test. + - Start up the consumers and wait until they've joined the group. + - In a loop, restart each broker consecutively, waiting for the group to stabilize between + each broker restart. + - Verify delivery semantics according to the failure type and that the broker bounces + did not cause unexpected group rebalances. + """ partition = TopicPartition(self.STOPIC, 0) + producer = self._setup_producer(self.STOPIC) consumer = self._setup_consumer(self.STOPIC) + + producer.start() + wait_until(lambda: producer.num_acked > 1000, timeout_sec=10, + err_msg="Producer failed waiting for messages to be written") + + consumer.start() + self._await_all_members(consumer) + + num_rebalances = consumer.num_rebalances() + # TODO: make this test work with hard shutdowns, which probably requires + # pausing before the node is restarted to ensure that any ephemeral + # nodes have time to expire + self.rolling_bounce_brokers(consumer, clean_shutdown=True) + + unexpected_rebalances = consumer.num_rebalances() - num_rebalances + assert unexpected_rebalances == 0, \ + "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances + + consumer.stop_all() + + assert consumer.current_position(partition) == consumer.total_consumed(), \ + "Total consumed records did not match consumed position" + + @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"]) + def test_consumer_bounce(self, clean_shutdown, bounce_mode): + """ + Verify correct consumer behavior when the consumers in the group are consecutively restarted. + + Setup: single Kafka cluster with one producer and a set of consumers in one group. + + - Start a producer which continues producing new messages throughout the test. + - Start up the consumers and wait until they've joined the group. + - In a loop, restart each consumer, waiting for each one to rejoin the group before + restarting the rest. + - Verify delivery semantics according to the failure type. + """ + partition = TopicPartition(self.STOPIC, 0) + + producer = self._setup_producer(self.STOPIC) + consumer = self._setup_consumer(self.STOPIC) + + producer.start() + wait_until(lambda: producer.num_acked > 1000, timeout_sec=10, + err_msg="Producer failed waiting for messages to be written") + + consumer.start() + self._await_all_members(consumer) + + if bounce_mode == "all": + self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown) + else: + self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown) + + consumer.stop_all() + if clean_shutdown: + # if the total records consumed matches the current position, we haven't seen any duplicates + # this can only be guaranteed with a clean shutdown + assert consumer.current_position(partition) == consumer.total_consumed(), \ + "Total consumed records did not match consumed position" + else: + # we may have duplicates in a hard failure + assert consumer.current_position(partition) <= consumer.total_consumed(), \ + "Current position greater than the total number of consumed records" + + @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False]) + def test_consumer_failure(self, clean_shutdown, enable_autocommit): + partition = TopicPartition(self.STOPIC, 0) + + consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit) producer = self._setup_producer(self.STOPIC) consumer.start() @@ -88,46 +225,72 @@ class VerifiableConsumerTest(KafkaTest): # startup the producer and ensure that some records have been written producer.start() - wait_until(lambda: producer.num_acked > 1000, timeout_sec=20, + wait_until(lambda: producer.num_acked > 1000, timeout_sec=10, err_msg="Producer failed waiting for messages to be written") # stop the partition owner and await its shutdown - consumer.kill_node(partition_owner, clean_shutdown=True) - wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20, - err_msg="Timed out waiting for consumer to close") + consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown) + wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None, + timeout_sec=self.session_timeout+5, err_msg="Timed out waiting for consumer to close") # ensure that the remaining consumer does some work after rebalancing - current_total_records = consumer.total_records - wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20, + current_total_consumed = consumer.total_consumed() + wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=10, err_msg="Timed out waiting for additional records to be consumed after first consumer failed") - # if the total records consumed matches the current position, - # we haven't seen any duplicates - assert consumer.position(partition) == consumer.total_records - assert consumer.committed(partition) <= consumer.total_records + consumer.stop_all() - def test_broker_failure(self): + if clean_shutdown: + # if the total records consumed matches the current position, we haven't seen any duplicates + # this can only be guaranteed with a clean shutdown + assert consumer.current_position(partition) == consumer.total_consumed(), \ + "Total consumed records did not match consumed position" + else: + # we may have duplicates in a hard failure + assert consumer.current_position(partition) <= consumer.total_consumed(), \ + "Current position greater than the total number of consumed records" + + # if autocommit is not turned on, we can also verify the last committed offset + if not enable_autocommit: + assert consumer.last_commit(partition) == consumer.current_position(partition), \ + "Last committed offset did not match last consumed position" + + + @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False]) + def test_broker_failure(self, clean_shutdown, enable_autocommit): partition = TopicPartition(self.STOPIC, 0) - consumer = self._setup_consumer(self.STOPIC) + consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit) producer = self._setup_producer(self.STOPIC) producer.start() consumer.start() self._await_all_members(consumer) + num_rebalances = consumer.num_rebalances() + # shutdown one of the brokers - self.kafka.signal_node(self.kafka.nodes[0]) + # TODO: we need a way to target the coordinator instead of picking arbitrarily + self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL) - # ensure that the remaining consumer does some work after broker failure - current_total_records = consumer.total_records - wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20, + # ensure that the consumers do some work after the broker failure + current_total_consumed = consumer.total_consumed() + wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=20, err_msg="Timed out waiting for additional records to be consumed after first consumer failed") - # if the total records consumed matches the current position, - # we haven't seen any duplicates - assert consumer.position(partition) == consumer.total_records - assert consumer.committed(partition) <= consumer.total_records + # verify that there were no rebalances on failover + assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance" + + consumer.stop_all() + + # if the total records consumed matches the current position, we haven't seen any duplicates + assert consumer.current_position(partition) == consumer.total_consumed(), \ + "Total consumed records did not match consumed position" + + # if autocommit is not turned on, we can also verify the last committed offset + if not enable_autocommit: + assert consumer.last_commit(partition) == consumer.current_position(partition), \ + "Last committed offset did not match last consumed position" def test_simple_consume(self): total_records = 1000 @@ -144,14 +307,13 @@ class VerifiableConsumerTest(KafkaTest): wait_until(lambda: producer.num_acked == total_records, timeout_sec=20, err_msg="Producer failed waiting for messages to be written") - wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10, + wait_until(lambda: consumer.last_commit(partition) == total_records, timeout_sec=10, err_msg="Consumer failed to read all expected messages") - assert consumer.position(partition) == total_records + assert consumer.current_position(partition) == total_records def test_valid_assignment(self): consumer = self._setup_consumer(self.TOPIC) consumer.start() self._await_all_members(consumer) assert self._valid_assignment(consumer.current_assignment()) -
