This is an automated email from the ASF dual-hosted git repository.
poorbarcode pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09035ffddad [improve][client] In cases where there is a risk of
message loss, adjust the log level to error (#25854)
09035ffddad is described below
commit 09035ffddada146f3ef014777ee5e5766f01f3f2
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 24 20:24:03 2026 +0800
[improve][client] In cases where there is a risk of message loss, adjust
the log level to error (#25854)
---
.../apache/pulsar/client/impl/ConsumerBase.java | 23 +++++++++++++++-------
.../apache/pulsar/client/impl/ConsumerImpl.java | 14 +++++++++++++
2 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index e8ccfcb272f..fe9ec2d59c4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -350,10 +350,15 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected void completePendingReceive(CompletableFuture<Message<T>>
receivedFuture, Message<T> message) {
getInternalExecutor(message).execute(() -> {
- if (!receivedFuture.complete(message)) {
- log.warn().attr("cancelled", receivedFuture.isCancelled())
- .attr("message", message)
- .log("Race condition detected, receive future was
already completed and message was dropped");
+ if (!receivedFuture.complete(message) && getState() !=
State.Closing && getState() != State.Closed) {
+ log.error().attr("cancelled", receivedFuture.isCancelled())
+ .attr("message", message)
+ .log("Race condition detected, receive future was already
completed and message was dropped."
+ + " In other words, the message was dropped
internally, the client-side will encounter a"
+ + " crucial issue: this message will never be consumed
until the consumer is restarted or"
+ + " the topic is unloaded. Under normal circumstances,
this won't happen. It only occurs when"
+ + " user itself has completed the completable future
object returned by"
+ + " \"consumer.receiveAsync()\"");
}
});
}
@@ -1118,9 +1123,13 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected void completePendingBatchReceive(CompletableFuture<Messages<T>>
future, Messages<T> messages) {
if (!future.complete(messages)) {
log.warn().attr("cancelled", future.isCancelled())
- .attr("messages", messages)
- .log("Race condition detected, batch receive future was"
- + " already completed and messages were dropped");
+ .attr("messages", messages)
+ .log("Race condition detected, receive future was already
completed and message was dropped."
+ + " In other words, the message was dropped internally,
the client-side will encounter a"
+ + " crucial issue: these message will never be consumed
until the consumer is restarted or"
+ + " the topic is unloaded. Under normal circumstances,
this won't happen. It only occurs when"
+ + " user itself has completed the completable future
object returned by"
+ + " \"consumer.batchReceiveAsync()\"");
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5862fce64de..f6567751d51 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1720,12 +1720,26 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception
exception) {
if (pendingReceives.isEmpty()) {
+ if (getState() != State.Closing && getState() != State.Closed) {
+ log.error().attr("message", message)
+ .attr("pendingReceives-size", pendingReceives.size())
+ .log("If you received this log, it means that you
encountered a bug: a message was"
+ + " dropped internally, the client-side will encounter
a crucial issue: this message will"
+ + " never be consumed until the consumer is restarted
or the topic is unloaded.");
+ }
return;
}
// fetch receivedCallback from queue
final CompletableFuture<Message<T>> receivedFuture =
nextPendingReceive();
if (receivedFuture == null) {
+ if (getState() != State.Closing && getState() != State.Closed) {
+ log.error().attr("message", message)
+ .log("The pendingReceives pulled out a null
conpletableFuture object. If you received this log,"
+ + " it means that you encountered a bug: a message was"
+ + " dropped internally, the client-side will encounter
a crucial issue: this message will never"
+ + " be consumed until the consumer is restarted or the
topic is unloaded.");
+ }
return;
}