michaeljmarshall commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847777124


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1155,16 +1156,24 @@ protected <V> MessageImpl<V> newMessage(final 
MessageIdImpl messageId,
     }
 
     private void executeNotifyCallback(final MessageImpl<T> message) {
+        internalPinnedExecutor.execute(() -> {
+            executeNotifyCallback0(message);
+        });
+    }
+
+    @VisibleForTesting
+    void executeNotifyCallback0(final MessageImpl<T> message) {

Review Comment:
   Instead of exposing this internal method for a test, are you able to create 
a funcational test that verifies this edge case?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf 
compressedPayload, MessageMetadata m
      * Notify waiting asyncReceive request with the received message.
      *
      * @param message
+     * @return the message is enqueue incomingMessages.
      */
-    void notifyPendingReceivedCallback(final Message<T> message, Exception 
exception) {
-        if (pendingReceives.isEmpty()) {

Review Comment:
   Can you clarify which race you're referencing? If you're referring to the 
implementation of `hasNextPendingReceive()`, `pendingReceives` is only supposed 
to be updated on the `internalPinnedExecutor`, so the check is redundant, but 
is not necessarily a race. (We can still remove the check, but I just want to 
see if there is another race you noticed.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to