Repository: kafka Updated Branches: refs/heads/trunk 69af573b3 -> 359be3a68
KAFKA-2674: clarify onPartitionsRevoked behavior Author: Jason Gustafson <ja...@confluent.io> Reviewers: Guozhang Wang Closes #467 from hachikuji/KAFKA-2674 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/359be3a6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/359be3a6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/359be3a6 Branch: refs/heads/trunk Commit: 359be3a682951fd469d690df8d9e7a5a89a9d03b Parents: 69af573 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Nov 9 11:17:18 2015 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Nov 9 11:17:18 2015 -0800 ---------------------------------------------------------------------- .../consumer/ConsumerRebalanceListener.java | 37 +++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/359be3a6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 671b6f2..8af405c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -57,16 +57,17 @@ import org.apache.kafka.common.TopicPartition; * this.consumer = consumer; * } * - * public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - * // read the offsets from an external store using some custom code not described here - * for(TopicPartition partition: partitions) - * consumer.seek(partition, readOffsetFromExternalStore(partition)); - * } * public void onPartitionsRevoked(Collection<TopicPartition> partitions) { * // save the offsets in an external store using some custom code not described here * for(TopicPartition partition: partitions) * saveOffsetInExternalStore(consumer.position(partition)); * } + * + * public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + * // read the offsets from an external store using some custom code not described here + * for(TopicPartition partition: partitions) + * consumer.seek(partition, readOffsetFromExternalStore(partition)); + * } * } * } * </pre> @@ -74,6 +75,20 @@ import org.apache.kafka.common.TopicPartition; public interface ConsumerRebalanceListener { /** + * A callback method the user can implement to provide handling of offset commits to a customized store on the start + * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer + * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a + * custom offset store to prevent duplicate data. + * <p> + * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + * <p> + * <b>NOTE:</b> This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}. + * + * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + */ + public void onPartitionsRevoked(Collection<TopicPartition> partitions); + + /** * A callback method the user can implement to provide handling of customized offsets on completion of a successful * partition re-assignment. This method will be called after an offset re-assignment completes and before the * consumer starts fetching data. @@ -86,16 +101,4 @@ public interface ConsumerRebalanceListener { * assigned to the consumer) */ public void onPartitionsAssigned(Collection<TopicPartition> partitions); - - /** - * A callback method the user can implement to provide handling of offset commits to a customized store on the start - * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer - * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a - * custom offset store to prevent duplicate data - * <p> - * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} - * - * @param partitions The list of partitions that were assigned to the consumer on the last rebalance - */ - public void onPartitionsRevoked(Collection<TopicPartition> partitions); }