BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927912570
@eolivelli Thanks for your feedback! Here're my answers.
> `default void cleanup() {}`
> It is not clear to me when this method will be called, when we are closing
the `Consumer` ?
> Because it has not reference to the `EntryContext`, so it is not related
to `convert`
Since `PayloadConverter#convert` returns a iterable messages, it's called in
this way in `ConsumerImpl`:
```java
for (Message<T> msg : converter.convert(entryContext, payload, schema)) {
// Do something with msg
}
converter.cleanup();
```
For example, if we want to collect the total bytes of each conversion and
expose it to outside, we might implement the converter as
```java
private long totalBytes = 0;
@Override
public <T> Iterable<Message<T>> convert(EntryContext context,
MessagePayload payload, Schema<T> schema) {
/* ... */
return () -> new Iterator<Message<T>>() {
/* ... */
@Override
public Message<T> next() {
final Message<T> msg = isBatch
? context.newSingleMessage(index, numMessages,
payload, true, schema)
: context.newMessage(payload, schema);
totalBytes += msg.getData().length;
return msg;
}
};
}
@Override
public void cleanup() {
recordStatistics(totalBytes); // pseudo code
}
```
I'm not sure whether there's a better name.
Regarding to `newSingleMessage` API design, you're right that `numMessages`
parameter should be removed.
> if EntryContext is a wrapper for an Entry maybe we can call it EntryWrapper
No, it's only a wrapper for Entry's metadata, not including the payload. So
I think the name `Context` is better.
And the key point of the latest design is to **avoid building the batch**.
Because if we built a batch using the converter, in `ConsumerImpl`, it still
needs to deserialize the batch into multiple single messages. The `payload ->
batch -> messages` conversion is unnecessary. This PR perfers `payload ->
Iterable<Message>`, and the `Iterable` maintains a latest progress internally.
1. If the entry is a batch, call `newSingleMessage` in each iteration.
2. Otherwise, call `newMessage` to convert the entry payload to a message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]