This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2dffe32c2a3 KAFKA-19249: Replace Consumer#close(Duration) with 
Consumer#close(CloseOptions) (#20983)
2dffe32c2a3 is described below

commit 2dffe32c2a36dc40e0fbcec3d2438275b4f268be
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Tue Nov 25 09:27:03 2025 -0500

    KAFKA-19249: Replace Consumer#close(Duration) with 
Consumer#close(CloseOptions) (#20983)
    
    # Description
    `Consumer#close(Duration)` was deprecated since 4.1. Replaced calls with
    `Consumer#close(CloseOptions)`.
    
    Reviewers: Kirk True <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../main/java/org/apache/kafka/clients/consumer/MockConsumer.java  | 2 +-
 .../test/scala/integration/kafka/api/IntegrationTestHarness.scala  | 7 ++++---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala            | 4 ++--
 .../kafka/server/log/remote/metadata/storage/ConsumerTask.java     | 4 ++--
 4 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 5178eedc91a..395f26338cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -572,7 +572,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public void close() {
-        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+        
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
     }
 
     @Deprecated
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index eb14858a944..32cf45061c3 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -18,7 +18,7 @@
 package kafka.api
 
 import java.time.Duration
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, KafkaShareConsumer, ShareConsumer}
+import org.apache.kafka.clients.consumer.{CloseOptions, Consumer, 
ConsumerConfig, KafkaConsumer, KafkaShareConsumer, ShareConsumer}
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
 
@@ -307,13 +307,14 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   @AfterEach
   override def tearDown(): Unit = {
     try {
+      val closeOptions = CloseOptions.timeout(Duration.ZERO)
       producers.foreach(_.close(Duration.ZERO))
       consumers.foreach(_.wakeup())
-      consumers.foreach(_.close(Duration.ZERO))
+      consumers.foreach(_.close(closeOptions))
       shareConsumers.foreach(_.wakeup())
       shareConsumers.foreach(_.close(Duration.ZERO))
       streamsConsumers.foreach(_.wakeup())
-      streamsConsumers.foreach(_.close(Duration.ZERO))
+      streamsConsumers.foreach(_.close(closeOptions))
       adminClients.foreach(_.close(Duration.ZERO))
 
       producers.clear()
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 170ee3679f4..43f8ad8e49f 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecord, ConsumerRecords, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{CloseOptions, Consumer, 
ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
Reconfigurable, TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -171,7 +171,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     clientThreads.foreach(_.join(5 * 1000))
     executors.foreach(_.shutdownNow())
     producers.foreach(_.close(Duration.ZERO))
-    consumers.foreach(_.close(Duration.ofMillis(0)))
+    consumers.foreach(_.close(CloseOptions.timeout(Duration.ZERO)))
     adminClients.foreach(_.close())
     TestUtils.shutdownServers(servers)
     super.tearDown()
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index f5c5755c4eb..804659039ec 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -154,10 +155,9 @@ class ConsumerTask implements Runnable, Closeable {
     }
 
     // visible for testing
-    @SuppressWarnings("deprecation")
     void closeConsumer() {
         try {
-            consumer.close(Duration.ofSeconds(30));
+            consumer.close(CloseOptions.timeout(Duration.ofSeconds(30)));
         } catch (final Exception e) {
             log.error("Error encountered while closing the consumer", e);
         }

Reply via email to