michaeljmarshall commented on code in PR #15107:
URL: https://github.com/apache/pulsar/pull/15107#discussion_r847777124
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1155,16 +1156,24 @@ protected <V> MessageImpl<V> newMessage(final
MessageIdImpl messageId,
}
private void executeNotifyCallback(final MessageImpl<T> message) {
+ internalPinnedExecutor.execute(() -> {
+ executeNotifyCallback0(message);
+ });
+ }
+
+ @VisibleForTesting
+ void executeNotifyCallback0(final MessageImpl<T> message) {
Review Comment:
Instead of exposing this internal method for a test, are you able to create
a funcational test that verifies this edge case?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1419,40 +1428,38 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
* Notify waiting asyncReceive request with the received message.
*
* @param message
+ * @return the message is enqueue incomingMessages.
*/
- void notifyPendingReceivedCallback(final Message<T> message, Exception
exception) {
- if (pendingReceives.isEmpty()) {
Review Comment:
Can you clarify which race you're referencing? If you're referring to the
implementation of `hasNextPendingReceive()`, `pendingReceives` is only supposed
to be updated on the `internalPinnedExecutor`, so the check is redundant, but
is not necessarily a race. (We can still remove the check, but I just want to
see if there is another race you noticed.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]