This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abeed20168d KAFKA-10790: Add deadlock detection to producer#flush 
(#17946)
abeed20168d is described below

commit abeed20168da0d2f3979fdec85b905f93ee88e5b
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jan 8 00:32:43 2025 +0800

    KAFKA-10790: Add deadlock detection to producer#flush (#17946)
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield 
<[email protected]>, TaiJuWu <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  9 ++++---
 .../kafka/clients/producer/KafkaProducerTest.java  | 28 ++++++++++++++++++++++
 docs/upgrade.html                                  | 10 ++++++++
 3 files changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 137eaaaa335..608bde98b6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1181,16 +1181,19 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * calls made since the previous {@link #beginTransaction()} are completed 
before the commit.
      * </p>
      * <p>
-     * <b>Important:</b> This method should not be used within the callback 
provided to
-     * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> 
in this context will cause a deadlock.
+     * <b>Important:</b> This method must not be called from within the 
callback provided to
+     * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> 
in this context will result in a
+     * {@link KafkaException} being thrown, as it will cause a deadlock.
      * </p>
      *
      * @throws InterruptException If the thread is interrupted while blocked
+     * @throws KafkaException If the method is invoked inside a {@link 
#send(ProducerRecord, Callback)} callback
      */
     @Override
     public void flush() {
         if (Thread.currentThread() == this.ioThread) {
-            log.error("KafkaProducer.flush() invocation inside a callback will 
cause a deadlock.");
+            log.error("KafkaProducer.flush() invocation inside a callback is 
not permitted because it may lead to deadlock.");
+            throw new KafkaException("KafkaProducer.flush() invocation inside 
a callback is not permitted because it may lead to deadlock.");
         }
 
         log.trace("Flushing accumulated records in producer.");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 09d386d5312..d058a80f57e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -2273,6 +2273,34 @@ public class KafkaProducerTest {
         }
     }
 
+    @Test
+    public void shouldNotInvokeFlushInCallback() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        // only test in idempotence disabled producer for simplicity
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        AtomicReference<KafkaException> kafkaException = new 
AtomicReference<>();
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(),
+            new StringSerializer(), metadata, client, null, time)) {
+            producer.send(
+                new ProducerRecord<>("topic", "value"),
+                (recordMetadata, exception) -> 
kafkaException.set(assertThrows(KafkaException.class, producer::flush))
+            );
+        }
+
+        assertNotNull(kafkaException.get());
+        assertEquals("KafkaProducer.flush() invocation inside a callback is 
not permitted because it may lead to deadlock.",
+            kafkaException.get().getMessage());
+    }
+
     @Test
     public void negativePartitionShouldThrow() {
         Map<String, Object> configs = new HashMap<>();
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 57cc1882ea5..24e28868220 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,16 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0 from any 
version 0.8.x through 4.0.x</a></h4>
+    <h5><a id="upgrade_410_notable" href="#upgrade_410_notable">Notable 
changes in 4.1.0</a></h5>
+        <ul>
+            <li><b>Producer</b>
+                <ul>
+                    <li>The <code>flush</code> method now detects potential 
deadlocks and prohibits its use inside a callback. This change prevents 
unintended blocking behavior, which was a known risk in earlier versions.
+                    </li>
+                </ul>
+            </li>
+        </ul>
 <h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any 
version 0.8.x through 3.9.x</a></h4>
     <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable 
changes in 4.0.0</a></h5>
     <ul>

Reply via email to