fvaleri commented on code in PR #14121:
URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103
##########
examples/src/main/java/kafka/examples/Consumer.java:
##########
@@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Utils.printOut("Assigned partitions: %s", partitions);
}
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions) {
- Utils.printOut("Lost partitions: %s", partitions);
Review Comment:
Hi @dajac, the default implementation of onPartitionsLost calls
onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit
pending offsets before losing the partition ownership). This also means that
"revoked" is logged instead of "lost".
https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199
For this reason, I thought we could simply use the default implementation
without overriding, but now I see that it is too late to save the offsets when
onPartitionsLost is called, since these partitions are probably owned by other
consumers already.
From the javadoc we have:
```sh
public void onPartitionsLost(Collection<TopicPartition> partitions) {
// do not need to save the offsets since these partitions are probably
owned by other consumers already
}
```
The default onPartitionsLost implementation is there to cover the case where
partitions are reassigned before we have a chance to revoke them gracefully
(i.e. in case of session timeout).Maybe we can leave it adding an appropriate
comment. Wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]