This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2dffe32c2a3 KAFKA-19249: Replace Consumer#close(Duration) with
Consumer#close(CloseOptions) (#20983)
2dffe32c2a3 is described below
commit 2dffe32c2a36dc40e0fbcec3d2438275b4f268be
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Tue Nov 25 09:27:03 2025 -0500
KAFKA-19249: Replace Consumer#close(Duration) with
Consumer#close(CloseOptions) (#20983)
# Description
`Consumer#close(Duration)` was deprecated since 4.1. Replaced calls with
`Consumer#close(CloseOptions)`.
Reviewers: Kirk True <[email protected]>, Andrew Schofield
<[email protected]>
---
.../main/java/org/apache/kafka/clients/consumer/MockConsumer.java | 2 +-
.../test/scala/integration/kafka/api/IntegrationTestHarness.scala | 7 ++++---
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 ++--
.../kafka/server/log/remote/metadata/storage/ConsumerTask.java | 4 ++--
4 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 5178eedc91a..395f26338cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -572,7 +572,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
}
@Deprecated
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index eb14858a944..32cf45061c3 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -18,7 +18,7 @@
package kafka.api
import java.time.Duration
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
KafkaConsumer, KafkaShareConsumer, ShareConsumer}
+import org.apache.kafka.clients.consumer.{CloseOptions, Consumer,
ConsumerConfig, KafkaConsumer, KafkaShareConsumer, ShareConsumer}
import kafka.utils.TestUtils
import kafka.utils.Implicits._
@@ -307,13 +307,14 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
@AfterEach
override def tearDown(): Unit = {
try {
+ val closeOptions = CloseOptions.timeout(Duration.ZERO)
producers.foreach(_.close(Duration.ZERO))
consumers.foreach(_.wakeup())
- consumers.foreach(_.close(Duration.ZERO))
+ consumers.foreach(_.close(closeOptions))
shareConsumers.foreach(_.wakeup())
shareConsumers.foreach(_.close(Duration.ZERO))
streamsConsumers.foreach(_.wakeup())
- streamsConsumers.foreach(_.close(Duration.ZERO))
+ streamsConsumers.foreach(_.close(closeOptions))
adminClients.foreach(_.close(Duration.ZERO))
producers.clear()
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 170ee3679f4..43f8ad8e49f 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
ConsumerRecord, ConsumerRecords, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{CloseOptions, Consumer,
ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
@@ -171,7 +171,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
clientThreads.foreach(_.join(5 * 1000))
executors.foreach(_.shutdownNow())
producers.foreach(_.close(Duration.ZERO))
- consumers.foreach(_.close(Duration.ofMillis(0)))
+ consumers.foreach(_.close(CloseOptions.timeout(Duration.ZERO)))
adminClients.foreach(_.close())
TestUtils.shutdownServers(servers)
super.tearDown()
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index f5c5755c4eb..804659039ec 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -154,10 +155,9 @@ class ConsumerTask implements Runnable, Closeable {
}
// visible for testing
- @SuppressWarnings("deprecation")
void closeConsumer() {
try {
- consumer.close(Duration.ofSeconds(30));
+ consumer.close(CloseOptions.timeout(Duration.ofSeconds(30)));
} catch (final Exception e) {
log.error("Error encountered while closing the consumer", e);
}