Repository: kafka Updated Branches: refs/heads/0.9.0 656b5f6a8 -> 552f65aa3
KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer Added an example clarifying the correct way to use explicit offsets with commitSync(). Author: Adam Kunicki <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #850 from kunickiaj/KAFKA-3191 (cherry picked from commit 0eaede4dc95846e2b8f7452f41c58c0122e7a563) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/552f65aa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/552f65aa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/552f65aa Branch: refs/heads/0.9.0 Commit: 552f65aa3911b80b1121463ce292679711fae66a Parents: 656b5f6 Author: Adam Kunicki <[email protected]> Authored: Mon Feb 8 16:56:59 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Feb 8 16:57:15 2016 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 31 ++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/552f65aa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 10fd8b9..c251c15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -217,6 +217,31 @@ import java.util.regex.Pattern; * } * </pre> * + * The above example uses {@link #commitSync() commitSync} to mark all received messages as committed. In some cases + * you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. + * In the example below we commit offset after we finish handling the messages in each partition. + * <p> + * <pre> + * try { + * while(running) { + * ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); + * for (TopicPartition partition : records.partitions()) { + * List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + * for (ConsumerRecord<String, String> record : partitionRecords) { + * System.out.println(record.offset() + ": " + record.value()); + * } + * long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + * consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + * } + * } + * } finally { + * consumer.close(); + * } + * </pre> + * + * <b>Note: The committed offset should always be the offset of the next message that your application will read.</b> + * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed. + * * <h4>Subscribing To Specific Partitions</h4> * * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process @@ -919,7 +944,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API - * should not be used. + * should not be used. The committed offset should be the next message your application will consume, + * i.e. lastProcessedMessageOffset + 1. * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). @@ -981,7 +1007,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API - * should not be used. + * should not be used. The committed offset should be the next message your application will consume, + * i.e. lastProcessedMessageOffset + 1. * <p> * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback * (if provided) or discarded.
