Repository: kafka Updated Branches: refs/heads/trunk cbdd8218c -> 76bcccd61
KAFKA-2745: Update JavaDoc for new / updated consumer APIs Author: Guozhang Wang <[email protected]> Reviewers: Jeff Holoman, Jason Gustafson, Gwen Shapira Closes #425 from guozhangwang/K2745 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76bcccd6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76bcccd6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76bcccd6 Branch: refs/heads/trunk Commit: 76bcccd61f61d33a4a4d4db02043a2f0063d0d38 Parents: cbdd821 Author: Guozhang Wang <[email protected]> Authored: Thu Nov 5 16:42:27 2015 -0800 Committer: Gwen Shapira <[email protected]> Committed: Thu Nov 5 16:42:27 2015 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 130 ++++++++++++------- 1 file changed, 81 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/76bcccd6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f3d2e15..d99607d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -65,10 +65,10 @@ import java.util.regex.Pattern; * A Kafka client that consumes records from a Kafka cluster. * <p> * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of - * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of + * data it fetches migrate within the cluster. This client also interacts with the server to allow groups of * consumers to load balance consumption using consumer groups (as described below). * <p> - * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to. + * The consumer maintains TCP connections to the necessary brokers to fetch data. * Failure to close the consumer after use will leak these connections. * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details. * @@ -84,19 +84,24 @@ import java.util.regex.Pattern; * <p> * The {@link #commitSync() committed position} is the last offset that has been saved securely. Should the * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit - * offsets periodically, or it can choose to control this committed position manually by calling - * {@link #commitSync() commit}. + * offsets periodically; or it can choose to control this committed position manually by calling + * {@link #commitSync() commitSync}, which will block until the offsets have been successfully committed + * or fatal error has happened during the commit process, or {@link #commitAsync(OffsetCommitCallback) commitAsync} which is non-blocking + * and will trigger {@link OffsetCommitCallback} upon either successfully committed or fatally failed. * <p> * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further * detail below. * - * <h3>Consumer Groups</h3> + * <h3>Consumer Groups and Topic Subscriptions</h3> * * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and * processing records. These processes can either be running on the same machine or, as is more likely, they can be * distributed over many machines to provide additional scalability and fault tolerance for processing. * <p> - * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the + * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the + * list of topics it wants to subscribe to through {@link #subscribe(List, ConsumerRebalanceListener)}, + * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}. + * Kafka will deliver each message in the * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a @@ -116,18 +121,21 @@ import java.util.regex.Pattern; * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would * have its own consumer group, so each process would subscribe to all the records published to the topic. * <p> - * In addition, when offsets are committed they are always committed for a given consumer group. + * In addition, when group reassignment happens automatically, consumers can be notified through {@link ConsumerRebalanceListener}, + * which allows them to finish necessary application-level logic such as state cleanup, manual offset + * commits (note that offsets are always committed for a given consumer group), etc. + * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details * <p> - * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic - * partition balancing. + * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(List)}, + * which disables this dynamic partition assignment. * * <h3>Usage Examples</h3> * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to * demonstrate how to use them. * - * <h4>Simple Processing</h4> - * This example demonstrates the simplest usage of Kafka's consumer api. - * + * <h4>Automatic Offset Committing</h4> + * This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing. + * <p> * <pre> * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); @@ -138,7 +146,7 @@ import java.util.regex.Pattern; * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); - * consumer.subscribe("foo", "bar"); + * consumer.subscribe(Arrays.asList("foo", "bar")); * while (true) { * ConsumerRecords<String, String> records = consumer.poll(100); * for (ConsumerRecord<String, String> record : records) @@ -166,8 +174,11 @@ import java.util.regex.Pattern; * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we * are saying that our record's key and value will just be simple strings. * - * <h4>Controlling When Messages Are Considered Consumed</h4> + * <h4>Manual Offset Control</h4> * + * Instead of relying on the consumer to periodically commit consumed offsets, users can also control when messages + * should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages + * are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing. * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages * would be considered consumed after they were given out by the consumer, and it would be possible that our process @@ -179,7 +190,7 @@ import java.util.regex.Pattern; * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one * time but in failure cases could be duplicated. - * + * <p> * <pre> * Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); @@ -190,7 +201,7 @@ import java.util.regex.Pattern; * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); - * consumer.subscribe("foo", "bar"); + * consumer.subscribe(Arrays.asList("foo", "bar")); * int commitInterval = 200; * List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); * while (true) { @@ -242,7 +253,7 @@ import java.util.regex.Pattern; * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load * balancing) using the same consumer instance. * - * <h4>Managing Your Own Offsets</h4> + * <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4> * * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own * choosing. The primary use case for this is allowing the application to store both the offset and the results of the @@ -263,19 +274,22 @@ import java.util.regex.Pattern; * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing * from what it has ensuring that no updates are lost. * </ul> - * + * <p> * Each record comes with its own offset, so to manage your own offset you just need to do the following: - * <ol> + * + * <ul> * <li>Configure <code>enable.auto.commit=false</code> * <li>Use the offset provided with each {@link ConsumerRecord} to save your position. * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}. - * </ol> + * </ul> * + * <p> * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the * search index use case described above). If the partition assignment is done automatically special care is * needed to handle the case where partition assignments change. This can be done by providing a - * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}. - * When partitions are taken from a consumer the consumer will want to commit its offset for those partitions by + * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)} + * and {@link #subscribe(Pattern, ConsumerRebalanceListener)}. + * For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by * implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}. @@ -298,12 +312,30 @@ import java.util.regex.Pattern; * Another use case is for a system that maintains local state as described in the previous section. In such a system * the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by - * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). - * + * re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). + * <p> * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special * methods for seeking to the earliest and latest offset the server maintains are also available ( * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively). * + * <h4>Consumption Flow Control</h4> + * + * If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, + * effectively giving these partitions the same priority for consumption. However in some cases consumers may want to + * first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions + * when these partitions have few or no data to consume. + * + * <p> + * One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. + * When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic + * in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are + * a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider + * fetching other topics. + * + * <p> + * Kafka supports dynamic controlling of consumption flows by using {@link #pause(TopicPartition...)} and {@link #resume(TopicPartition...)} + * to pause the consumption on the specified assigned partitions and resume the consumption + * on the specified paused partitions respectively in the future {@link #poll(long)} calls. * * <h3><a name="multithreaded">Multi-threaded Processing</a></h3> * @@ -347,6 +379,7 @@ import java.util.regex.Pattern; * * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer. * + * <p> * <pre> * closed.set(true); * consumer.wakeup(); @@ -593,7 +626,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * The set of partitions currently assigned to this consumer. If subscription happened by directly assigning + * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning * partitions using {@link #assign(List)} then this will simply return the same partitions that * were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned * to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the @@ -624,9 +657,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Subscribe to the given list of topics and use the consumer's group management functionality to - * assign partitions. Topic subscriptions are not incremental. This list will replace the current - * assignment (if there is one). Note that it is not possible to combine topic subscription with group management + * Subscribe to the given list of topics to get dynamically + * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current + * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. @@ -669,9 +702,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Subscribe to the given list of topics and use the consumer's group management functionality to - * assign partitions. Topic subscriptions are not incremental. This list will replace the current - * assignment (if there is one). It is not possible to combine topic subscription with group management + * Subscribe to the given list of topics to get dynamically assigned partitions. + * <b>Topic subscriptions are not incremental. This list will replace the current + * assignment (if there is one).</b> It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(List)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. @@ -691,8 +724,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Subscribes to topics matching specified pattern and uses the consumer's group - * management functionality. The pattern matching will be done periodically against topics + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics * existing at the time of check. * <p> * As part of group management, the consumer will keep track of the list of consumers that @@ -720,7 +752,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Unsubscribe from topics currently subscribed to + * Unsubscribe from all topics currently subscribed to. */ public void unsubscribe() { acquire(); @@ -735,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Assign a list of partition to this consumer. This interface does not allow for incremental assignment + * Manually assign a list of partition to this consumer. This interface does not allow for incremental assignment * and will replace the previous assignment (if there is one). * <p> * Manual topic assignment through this method does not use the consumer's group management @@ -761,14 +793,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Fetches data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have + * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have * subscribed to any topics or partitions before polling for data. * <p> - * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used. - * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every - * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed - * offset using {@link #commitSync(Map) commit(offsets)} for the subscribed list of partitions. - * + * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last + * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed + * offset for the subscribed list of partitions + * * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns * immediately with any records available now. Must not be negative. * @return map of topic to records since the last fetch for the subscribed list of topics and partitions @@ -851,7 +882,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** - * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions. * <p> * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -873,7 +904,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Commits the specified offsets for the specified list of topics and partitions to Kafka. + * Commit the specified offsets for the specified list of topics and partitions. * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -899,7 +930,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Convenient method. Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} + * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition. + * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} */ @Override public void commitAsync() { @@ -907,7 +939,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. * <p> * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -929,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Commits the specified offsets for the specified list of topics and partitions to Kafka. + * Commit the specified offsets for the specified list of topics and partitions to Kafka. * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -1005,7 +1037,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). + * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). * * @param partition The partition to get the position for * @return The offset @@ -1034,7 +1066,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Fetches the last committed offset for the given partition (whether the commit happened by this process or + * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the @@ -1139,7 +1171,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** - * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to + * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to * {@link #poll(long)} will return records from these partitions if there are any to be fetched. * If the partitions were not previously paused, this method is a no-op. * @param partitions The partitions which should be resumed
