This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new abeed20168d KAFKA-10790: Add deadlock detection to producer#flush
(#17946)
abeed20168d is described below
commit abeed20168da0d2f3979fdec85b905f93ee88e5b
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jan 8 00:32:43 2025 +0800
KAFKA-10790: Add deadlock detection to producer#flush (#17946)
Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
<[email protected]>, TaiJuWu <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 9 ++++---
.../kafka/clients/producer/KafkaProducerTest.java | 28 ++++++++++++++++++++++
docs/upgrade.html | 10 ++++++++
3 files changed, 44 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 137eaaaa335..608bde98b6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1181,16 +1181,19 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* calls made since the previous {@link #beginTransaction()} are completed
before the commit.
* </p>
* <p>
- * <b>Important:</b> This method should not be used within the callback
provided to
- * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code>
in this context will cause a deadlock.
+ * <b>Important:</b> This method must not be called from within the
callback provided to
+ * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code>
in this context will result in a
+ * {@link KafkaException} being thrown, as it will cause a deadlock.
* </p>
*
* @throws InterruptException If the thread is interrupted while blocked
+ * @throws KafkaException If the method is invoked inside a {@link
#send(ProducerRecord, Callback)} callback
*/
@Override
public void flush() {
if (Thread.currentThread() == this.ioThread) {
- log.error("KafkaProducer.flush() invocation inside a callback will
cause a deadlock.");
+ log.error("KafkaProducer.flush() invocation inside a callback is
not permitted because it may lead to deadlock.");
+ throw new KafkaException("KafkaProducer.flush() invocation inside
a callback is not permitted because it may lead to deadlock.");
}
log.trace("Flushing accumulated records in producer.");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 09d386d5312..d058a80f57e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -2273,6 +2273,34 @@ public class KafkaProducerTest {
}
}
+ @Test
+ public void shouldNotInvokeFlushInCallback() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ // only test in idempotence disabled producer for simplicity
+ configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+ AtomicReference<KafkaException> kafkaException = new
AtomicReference<>();
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.send(
+ new ProducerRecord<>("topic", "value"),
+ (recordMetadata, exception) ->
kafkaException.set(assertThrows(KafkaException.class, producer::flush))
+ );
+ }
+
+ assertNotNull(kafkaException.get());
+ assertEquals("KafkaProducer.flush() invocation inside a callback is
not permitted because it may lead to deadlock.",
+ kafkaException.get().getMessage());
+ }
+
@Test
public void negativePartitionShouldThrow() {
Map<String, Object> configs = new HashMap<>();
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 57cc1882ea5..24e28868220 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,16 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0 from any
version 0.8.x through 4.0.x</a></h4>
+ <h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable
changes in 4.1.0</a></h5>
+ <ul>
+ <li><b>Producer</b>
+ <ul>
+ <li>The <code>flush</code> method now detects potential
deadlocks and prohibits its use inside a callback. This change prevents
unintended blocking behavior, which was a known risk in earlier versions.
+ </li>
+ </ul>
+ </li>
+ </ul>
<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any
version 0.8.x through 3.9.x</a></h4>
<h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable
changes in 4.0.0</a></h5>
<ul>