BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927912570


   @eolivelli Thanks for your feedback! Here're my answers.
   
   > `default void cleanup() {}`
   > It is not clear to me when this method will be called, when we are closing 
the `Consumer` ?
   > Because it has not reference to the `EntryContext`, so it is not related 
to `convert`
   
   Since `PayloadConverter#convert` returns a iterable messages, it's called in 
this way in `ConsumerImpl`:
   
   ```java
   for (Message<T> msg : converter.convert(entryContext, payload, schema)) {
       // Do something with msg
   }
   converter.cleanup();
   ```
   
   For example, if we want to collect the total bytes of each conversion and 
expose it to outside, we might implement the converter as
   
   ```java
       private long totalBytes = 0;
   
       @Override
       public <T> Iterable<Message<T>> convert(EntryContext context, 
MessagePayload payload, Schema<T> schema) {
           /* ... */
           return () -> new Iterator<Message<T>>() {
               /* ... */
   
               @Override
               public Message<T> next() {
                   final Message<T> msg = isBatch
                           ? context.newSingleMessage(index, numMessages, 
payload, true, schema)
                           : context.newMessage(payload, schema);
                   totalBytes += msg.getData().length;
                   return msg;
               }
           };
       }
   
       @Override
       public void cleanup() {
           recordStatistics(totalBytes);  // pseudo code
       }
   ```
   
   I'm not sure whether there's a better name.
   
   Regarding to `newSingleMessage` API design, you're right that `numMessages` 
parameter should be removed.
   
   > if EntryContext is a wrapper for an Entry maybe we can call it EntryWrapper
   No, it's only a wrapper for Entry's metadata, not including the payload. So 
I think the name `Context` is better.
   
   And the key point of the latest design is to **avoid building the batch**. 
Because if we built a batch using the converter, in `ConsumerImpl`, it still 
needs to deserialize the batch into multiple single messages. The `payload -> 
batch -> messages` conversion is unnecessary. This PR perfers `payload -> 
Iterable<Message>`, and the `Iterable` maintains a latest progress internally.
   
   1. If the entry is a batch, call `newSingleMessage` in each iteration.
   2. Otherwise, call `newMessage` to convert the entry payload to a message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to