David Jacot created KAFKA-16194:
-----------------------------------
Summary: KafkaConsumer.groupMetadata() should be correct when
first records are returned
Key: KAFKA-16194
URL: https://issues.apache.org/jira/browse/KAFKA-16194
Project: Kafka
Issue Type: Sub-task
Reporter: David Jacot
The following code returns records before the group metadata is updated. This
fails the first transactions ever run by the Producer/Consumer.
{code:java}
Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
txnProducer.initTransactions();
System.out.println("Init transactions called");
try {
txnProducer.beginTransaction();
System.out.println("Begin transactions called");
consumer.subscribe(Collections.singletonList("input"));
System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(10));
System.out.println("Returned " + records.count() + " records.");
// Process and send txn messages.
for (ConsumerRecord<String, String> processedRecord : records) {
txnProducer.send(new ProducerRecord<>("output", processedRecord.key(),
"Processed: " + processedRecord.value()));
}
ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
System.out.println("Group metadata inside test" + groupMetadata);
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
offsetsToCommit.put(new TopicPartition(record.topic(),
record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
System.out.println("Offsets to commit" + offsetsToCommit);
// Send offsets to transaction with ConsumerGroupMetadata.
txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
System.out.println("Send offsets to transaction done");
// Commit the transaction.
txnProducer.commitTransaction();
System.out.println("Commit transaction done");
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
e.printStackTrace();
txnProducer.close();
} catch (KafkaException e) {
e.printStackTrace();
txnProducer.abortTransaction();
} finally {
txnProducer.close();
consumer.close();
} {code}
The issue seems to be that while it waits in `poll`, the event to update the
group metadata is not processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)