fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103
##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Utils.printOut("Assigned partitions: %s", partitions);
}
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions) {
- Utils.printOut("Lost partitions: %s", partitions);
Review Comment:
Hi @dajac, the default implementation of onPartitionsLost calls
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit
pending offsets before losing the partition ownership). This also means that
"revoked" is logged instead of "lost".
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
For this readon, I thought we could simply use the default implementation
without overriding, but now I see that it is too late to save the offsets when
onPartitionsLost is called, since these partitions are probably owned by other
consumers already.
From ConsumerRebalanceListener javadoc:
```sh
public void onPartitionsLost(Collection<TopicPartition> partitions) {
// do not need to save the offsets since these partitions are probably
owned by other consumers already
}
```
I guess the default onPartitionsLost implementation is there to cover some
corner case where onPartitionsRevoked may not be triggered, but we may still
need to do some other cleanup. Is that correct? Maybe we can leave it adding an
appropriate comment. Wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]