This is an automated email from the ASF dual-hosted git repository.

poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09035ffddad [improve][client] In cases where there is a risk of 
message loss, adjust the log level to error (#25854)
09035ffddad is described below

commit 09035ffddada146f3ef014777ee5e5766f01f3f2
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 24 20:24:03 2026 +0800

    [improve][client] In cases where there is a risk of message loss, adjust 
the log level to error (#25854)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 23 +++++++++++++++-------
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 14 +++++++++++++
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index e8ccfcb272f..fe9ec2d59c4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -350,10 +350,15 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
         getInternalExecutor(message).execute(() -> {
-            if (!receivedFuture.complete(message)) {
-                log.warn().attr("cancelled", receivedFuture.isCancelled())
-                        .attr("message", message)
-                        .log("Race condition detected, receive future was 
already completed and message was dropped");
+            if (!receivedFuture.complete(message) && getState() != 
State.Closing && getState() != State.Closed) {
+                log.error().attr("cancelled", receivedFuture.isCancelled())
+                    .attr("message", message)
+                    .log("Race condition detected, receive future was already 
completed and message was dropped."
+                        + " In other words, the message was dropped 
internally, the client-side will encounter a"
+                        + " crucial issue: this message will never be consumed 
until the consumer is restarted or"
+                        + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                        + " user itself has completed the completable future 
object returned by"
+                        + " \"consumer.receiveAsync()\"");
             }
         });
     }
@@ -1118,9 +1123,13 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
     protected void completePendingBatchReceive(CompletableFuture<Messages<T>> 
future, Messages<T> messages) {
         if (!future.complete(messages)) {
             log.warn().attr("cancelled", future.isCancelled())
-                    .attr("messages", messages)
-                    .log("Race condition detected, batch receive future was"
-                            + " already completed and messages were dropped");
+                .attr("messages", messages)
+                .log("Race condition detected, receive future was already 
completed and message was dropped."
+                    + " In other words, the message was dropped internally, 
the client-side will encounter a"
+                    + " crucial issue: these message will never be consumed 
until the consumer is restarted or"
+                    + " the topic is unloaded. Under normal circumstances, 
this won't happen. It only occurs when"
+                    + " user itself has completed the completable future 
object returned by"
+                    + " \"consumer.batchReceiveAsync()\"");
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5862fce64de..f6567751d51 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1720,12 +1720,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
      */
     void notifyPendingReceivedCallback(final Message<T> message, Exception 
exception) {
         if (pendingReceives.isEmpty()) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error().attr("message", message)
+                    .attr("pendingReceives-size", pendingReceives.size())
+                    .log("If you received this log, it means that you 
encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will"
+                        + " never be consumed until the consumer is restarted 
or the topic is unloaded.");
+            }
             return;
         }
 
         // fetch receivedCallback from queue
         final CompletableFuture<Message<T>> receivedFuture = 
nextPendingReceive();
         if (receivedFuture == null) {
+            if (getState() != State.Closing && getState() != State.Closed) {
+                log.error().attr("message", message)
+                    .log("The pendingReceives pulled out a null 
conpletableFuture object. If you received this log,"
+                        + " it means that you encountered a bug: a message was"
+                        + " dropped internally, the client-side will encounter 
a crucial issue: this message will never"
+                        + " be consumed until the consumer is restarted or the 
topic is unloaded.");
+            }
             return;
         }
 

Reply via email to