This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b348b556be4 KAFKA-17202 surround consumer with try-resource statement
(#16702)
b348b556be4 is described below
commit b348b556be415adc9fadc3d86d04a74e4c1eb3f7
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Jul 30 01:06:11 2024 +0800
KAFKA-17202 surround consumer with try-resource statement (#16702)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 45 +++++++++++++---------
1 file changed, 27 insertions(+), 18 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 7cf725440c6..38e7e5cc0ca 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -999,27 +999,36 @@ public class EosIntegrationTest {
}
private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final
TopicPartition tp, final long checkpointedOffset) {
- final KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(),
Serdes.ByteArray().deserializer().getClass(),
Serdes.ByteArray().deserializer().getClass()));
- final List<TopicPartition> partitions = Collections.singletonList(tp);
- consumer.assign(partitions);
- consumer.seekToEnd(partitions);
- final long topicEndOffset = consumer.position(tp);
-
- assertTrue(topicEndOffset >= checkpointedOffset,
- "changelog topic end " + topicEndOffset + " is less than
checkpointed offset " + checkpointedOffset);
-
- consumer.seekToBeginning(partitions);
-
- Long maxRecordOffset = null;
- while (consumer.position(tp) != topicEndOffset) {
- final List<ConsumerRecord<String, String>> records =
consumer.poll(Duration.ofMillis(0)).records(tp);
- if (!records.isEmpty()) {
- maxRecordOffset = records.get(records.size() - 1).offset();
+ try (
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+ consumerConfig(
+ CLUSTER.bootstrapServers(),
+ Serdes.ByteArray().deserializer().getClass(),
+ Serdes.ByteArray().deserializer().getClass()
+ )
+ )
+ ) {
+ final List<TopicPartition> partitions =
Collections.singletonList(tp);
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+ final long topicEndOffset = consumer.position(tp);
+
+ assertTrue(topicEndOffset >= checkpointedOffset,
+ "changelog topic end " + topicEndOffset + " is less than
checkpointed offset " + checkpointedOffset);
+
+ consumer.seekToBeginning(partitions);
+
+ Long maxRecordOffset = null;
+ while (consumer.position(tp) != topicEndOffset) {
+ final List<ConsumerRecord<String, String>> records =
consumer.poll(Duration.ofMillis(0)).records(tp);
+ if (!records.isEmpty()) {
+ maxRecordOffset = records.get(records.size() - 1).offset();
+ }
}
- }
- assertEquals(maxRecordOffset, (Long) checkpointedOffset,
+ assertEquals(maxRecordOffset, (Long) checkpointedOffset,
"Checkpointed offset does not match end of changelog");
+ }
}
private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,