This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b348b556be4 KAFKA-17202 surround consumer with try-resource statement 
(#16702)
b348b556be4 is described below

commit b348b556be415adc9fadc3d86d04a74e4c1eb3f7
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Jul 30 01:06:11 2024 +0800

    KAFKA-17202 surround consumer with try-resource statement (#16702)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../streams/integration/EosIntegrationTest.java    | 45 +++++++++++++---------
 1 file changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 7cf725440c6..38e7e5cc0ca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -999,27 +999,36 @@ public class EosIntegrationTest {
     }
 
     private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final 
TopicPartition tp, final long checkpointedOffset) {
-        final KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), 
Serdes.ByteArray().deserializer().getClass(), 
Serdes.ByteArray().deserializer().getClass()));
-        final List<TopicPartition> partitions = Collections.singletonList(tp);
-        consumer.assign(partitions);
-        consumer.seekToEnd(partitions);
-        final long topicEndOffset = consumer.position(tp);
-
-        assertTrue(topicEndOffset >= checkpointedOffset,
-                "changelog topic end " + topicEndOffset + " is less than 
checkpointed offset " + checkpointedOffset);
-
-        consumer.seekToBeginning(partitions);
-
-        Long maxRecordOffset = null;
-        while (consumer.position(tp) != topicEndOffset) {
-            final List<ConsumerRecord<String, String>> records = 
consumer.poll(Duration.ofMillis(0)).records(tp);
-            if (!records.isEmpty()) {
-                maxRecordOffset = records.get(records.size() - 1).offset();
+        try (
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+                consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    Serdes.ByteArray().deserializer().getClass(),
+                    Serdes.ByteArray().deserializer().getClass()
+                )
+            )
+        ) {
+            final List<TopicPartition> partitions = 
Collections.singletonList(tp);
+            consumer.assign(partitions);
+            consumer.seekToEnd(partitions);
+            final long topicEndOffset = consumer.position(tp);
+
+            assertTrue(topicEndOffset >= checkpointedOffset,
+                    "changelog topic end " + topicEndOffset + " is less than 
checkpointed offset " + checkpointedOffset);
+
+            consumer.seekToBeginning(partitions);
+
+            Long maxRecordOffset = null;
+            while (consumer.position(tp) != topicEndOffset) {
+                final List<ConsumerRecord<String, String>> records = 
consumer.poll(Duration.ofMillis(0)).records(tp);
+                if (!records.isEmpty()) {
+                    maxRecordOffset = records.get(records.size() - 1).offset();
+                }
             }
-        }
 
-        assertEquals(maxRecordOffset, (Long) checkpointedOffset,
+            assertEquals(maxRecordOffset, (Long) checkpointedOffset,
                 "Checkpointed offset does not match end of changelog");
+        }
     }
 
     private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,

Reply via email to