This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f57fd2d9fd1 MINOR: Logs warning message when user invoke 
producer#flush within callback (#18112)
f57fd2d9fd1 is described below

commit f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Dec 10 23:27:42 2024 +0800

    MINOR: Logs warning message when user invoke producer#flush within callback 
(#18112)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/org/apache/kafka/clients/producer/KafkaProducer.java     | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1b1ffcc9d15..0229c43cb8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1219,11 +1219,19 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * flush all buffered records before performing the commit. This ensures 
that all the {@link #send(ProducerRecord)}
      * calls made since the previous {@link #beginTransaction()} are completed 
before the commit.
      * </p>
+     * <p>
+     * <b>Important:</b> This method should not be used within the callback 
provided to
+     * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code> 
in this context will cause a deadlock.
+     * </p>
      *
      * @throws InterruptException If the thread is interrupted while blocked
      */
     @Override
     public void flush() {
+        if (Thread.currentThread() == this.ioThread) {
+            log.error("KafkaProducer.flush() invocation inside a callback will 
cause a deadlock.");
+        }
+
         log.trace("Flushing accumulated records in producer.");
 
         long start = time.nanoseconds();

Reply via email to