shibd commented on code in PR #23235:
URL: https://github.com/apache/pulsar/pull/23235#discussion_r1741371997


##########
pip/pip-374.md:
##########
@@ -0,0 +1,71 @@
+# PIP-374: Visibility of messages in receiverQueue for the consumers
+
+# Background knowledge
+
+When a consumer connects to the Broker, the broker starts dispatching the 
messages based on receiverQueueSize configured. 
+There is no observability for the messages arrived on the consumer side if the 
user didn't call the receive method. It leads to ambiguities at times as
+the consumer application does not know whether the message was actually sent 
by the broker or is it lost in the network or is it lost in the receiver queue.
+
+ConsumerInterceptors is a plugin interface that  intercept and possibly mutate 
messages received by the consumer.
+
+
+# Motivation
+
+* We need to receive queue filling of the event as the particular message is 
already on particular consumer's receiver queue and waiting for the consumer to 
pickup and process. It may wait in the recieverQueue longer if the consumer 
processing takes more time. It's very important to provide the visibility of 
the messages that are waiting in receiverQueue for processing.
+
+* Availability of a consumer application w.r.t any messaging system depends on 
the number of messages dispatched from the server/broker against the number of 
messages acknowledged from the consumer app. This metric defines the processing 
rate of a consumer.
+Currently, the number of acknowledged messages can be counted by having a 
counter in onAcknowledge() method of ConsumerInterceptor. But, there is no way 
to capture the number of messages arrived in Consumer.
+
+
+What does this solve?
+* Visibility about the message in receiverQueue for the consumer.
+* Stuck consumer state visibility
+* Scale the consumers to process the spikes in producer traffic
+* Reduce the overhead of processing the redeliveries
+
+
+# Goals
+
+## In Scope
+
+The proposal will add a method to the interceptor to allow users to knowthe 
message has been received by the consumer.
+
+Add a default abstract method in ConsumerInterceptor called  onArrival() and 
hook this method call in the internal consumer of MultiTopicConsumerImpl and 
ConsumerImpl. By this way, there will be an observability of message received 
for the consumer.
+
+
+# High Level Design
+
+* Add onArrival() abstract method in ConsumerInterceptor interface.
+* Hook this method call where the consumer receives the batch messages at 
once(based on configured receiverQueueSize).
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+* ConsumerInterceptor.java
+```
+default Message<T> onArrival()(Consumer<T> consumer, Message<T> message){
+ return message;
+}
+
+```
+
+* Add hook in ConsumerImpl.messageReceived which calls onArrival method which 
calculates the the number of message received.

Review Comment:
   There is a lot of business logic in the `ConsumerImpl.messageReceived` 
method. If we need to pass a `Message` object to `onArrival`, we must ensure 
that the message has been correctly parsed.
   
   However, before correctly parsing the message object, there is a lot of 
logic that may skip this message. 
   
https://github.com/apache/pulsar/blob/d4bbf10f58771e2d43e576dc3422e502834b1de4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1349-L1423
   
   I want to say that this might not match with the expected behavior defined 
by `onArrival` method name.
   
   Maybe we can modify the method to `onArrival(Consumer<T> consumer, MessageId 
messageId)`. When the message just arrives at this method, we can get the 
messageId and then inform the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to