This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f57fd2d9fd1 MINOR: Logs warning message when user invoke
producer#flush within callback (#18112)
f57fd2d9fd1 is described below
commit f57fd2d9fd1c8a240d6c15ad35e83fd9283a958f
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Dec 10 23:27:42 2024 +0800
MINOR: Logs warning message when user invoke producer#flush within callback
(#18112)
Reviewers: Andrew Schofield <[email protected]>
---
.../java/org/apache/kafka/clients/producer/KafkaProducer.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1b1ffcc9d15..0229c43cb8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1219,11 +1219,19 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* flush all buffered records before performing the commit. This ensures
that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed
before the commit.
* </p>
+ * <p>
+ * <b>Important:</b> This method should not be used within the callback
provided to
+ * {@link #send(ProducerRecord, Callback)}. Invoking <code>flush()</code>
in this context will cause a deadlock.
+ * </p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void flush() {
+ if (Thread.currentThread() == this.ioThread) {
+ log.error("KafkaProducer.flush() invocation inside a callback will
cause a deadlock.");
+ }
+
log.trace("Flushing accumulated records in producer.");
long start = time.nanoseconds();