http://git-wip-us.apache.org/repos/asf/kafka-site/blob/2d621da3/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html ---------------------------------------------------------------------- diff --git a/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html b/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html new file mode 100644 index 0000000..3840111 --- /dev/null +++ b/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html @@ -0,0 +1,1554 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- NewPage --> +<html lang="en"> +<head> +<!-- Generated by javadoc (version 1.7.0_79) on Tue Oct 04 12:37:07 PDT 2016 --> +<title>KafkaConsumer (kafka 0.10.1.0 API)</title> +<meta name="date" content="2016-10-04"> +<link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style"> +</head> +<body> +<script type="text/javascript"><!-- + if (location.href.indexOf('is-external=true') == -1) { + parent.document.title="KafkaConsumer (kafka 0.10.1.0 API)"; + } +//--> +</script> +<noscript> +<div>JavaScript is disabled on your browser.</div> +</noscript> +<!-- ========= START OF TOP NAVBAR ======= --> +<div class="topNav"><a name="navbar_top"> +<!-- --> +</a><a href="#skip-navbar_top" title="Skip navigation links"></a><a name="navbar_top_firstrow"> +<!-- --> +</a> +<ul class="navList" title="Navigation"> +<li><a href="../../../../../overview-summary.html">Overview</a></li> +<li><a href="package-summary.html">Package</a></li> +<li class="navBarCell1Rev">Class</li> +<li><a href="package-tree.html">Tree</a></li> +<li><a href="../../../../../deprecated-list.html">Deprecated</a></li> +<li><a href="../../../../../index-all.html">Index</a></li> +<li><a href="../../../../../help-doc.html">Help</a></li> +</ul> +</div> +<div class="subNav"> +<ul class="navList"> +<li><a href="../../../../../org/apache/kafka/clients/consumer/InvalidOffsetException.html" title="class in org.apache.kafka.clients.consumer"><span class="strong">Prev Class</span></a></li> +<li><a href="../../../../../org/apache/kafka/clients/consumer/MockConsumer.html" title="class in org.apache.kafka.clients.consumer"><span class="strong">Next Class</span></a></li> +</ul> +<ul class="navList"> +<li><a href="../../../../../index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html" target="_top">Frames</a></li> +<li><a href="KafkaConsumer.html" target="_top">No Frames</a></li> +</ul> +<ul class="navList" id="allclasses_navbar_top"> +<li><a href="../../../../../allclasses-noframe.html">All Classes</a></li> +</ul> +<div> +<script type="text/javascript"><!-- + allClassesLink = document.getElementById("allclasses_navbar_top"); + if(window==top) { + allClassesLink.style.display = "block"; + } + else { + allClassesLink.style.display = "none"; + } + //--> +</script> +</div> +<div> +<ul class="subNavList"> +<li>Summary: </li> +<li>Nested | </li> +<li>Field | </li> +<li><a href="#constructor_summary">Constr</a> | </li> +<li><a href="#method_summary">Method</a></li> +</ul> +<ul class="subNavList"> +<li>Detail: </li> +<li>Field | </li> +<li><a href="#constructor_detail">Constr</a> | </li> +<li><a href="#method_detail">Method</a></li> +</ul> +</div> +<a name="skip-navbar_top"> +<!-- --> +</a></div> +<!-- ========= END OF TOP NAVBAR ========= --> +<!-- ======== START OF CLASS DATA ======== --> +<div class="header"> +<div class="subTitle">org.apache.kafka.clients.consumer</div> +<h2 title="Class KafkaConsumer" class="title">Class KafkaConsumer<K,V></h2> +</div> +<div class="contentContainer"> +<ul class="inheritance"> +<li><a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">java.lang.Object</a></li> +<li> +<ul class="inheritance"> +<li>org.apache.kafka.clients.consumer.KafkaConsumer<K,V></li> +</ul> +</li> +</ul> +<div class="description"> +<ul class="blockList"> +<li class="blockList"> +<dl> +<dt>All Implemented Interfaces:</dt> +<dd><a href="http://docs.oracle.com/javase/7/docs/api/java/io/Closeable.html?is-external=true" title="class or interface in java.io">Closeable</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html?is-external=true" title="class or interface in java.lang">AutoCloseable</a>, <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><K,V></dd> +</dl> +<hr> +<br> +<pre>public class <span class="strong">KafkaConsumer<K,V></span> +extends <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a> +implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><K,V></pre> +<div class="block">A client that consumes records from a Kafka cluster. + <p> + This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions + it fetches migrate within the cluster. This client also interacts with the broker to allow groups of + consumers to load balance consumption using <a href="#consumergroups">consumer groups</a>. + <p> + The consumer maintains TCP connections to the necessary brokers to fetch data. + Failure to close the consumer after use will leak these connections. + The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details. + + <h3>Offsets and Consumer Position</h3> + Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of + a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer + which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There + are actually two notions of position relevant to the user of the consumer: + <p> + The <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)"><code>position</code></a> of the consumer gives the offset of the next record that will be given + out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances + every time the consumer receives messages in a call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a>. + <p> + The <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>committed position</code></a> is the last offset that has been stored securely. Should the + process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit + offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs + (e.g. <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>commitSync</code></a> and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback)"><code>commitAsync</code></a>). + <p> + This distinction gives the consumer control over when a record is considered consumed. It is discussed in further + detail below. + + <h3><a name="consumergroups">Consumer Groups and Topic Subscriptions</a></h3> + + Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and + processing records. These processes can either be running on the same machine or they can be + distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances + sharing the same <code>group.id</code> will be part of the same consumer group. + <p> + Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe</code></a> APIs. Kafka will deliver each message in the + subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all + members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there + is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions. + <p> + Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will + be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved + from existing consumers to the new one. This is known as <i>rebalancing</i> the group and is discussed in more + detail <a href="#failuredetection">below</a>. Group rebalancing is also used when new partitions are added + to one of the subscribed topics or when a new topic matching a <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribed regex</code></a> + is created. The group will automatically detect the new partitions through periodic metadata refreshes and + assign them to members of the group. + <p> + Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of + multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a + given topic without duplicating data (additional consumers are actually quite cheap). + <p> + This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to + a queue in a traditional messaging system all processes would be part of a single consumer group and hence record + delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can + have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would + have its own consumer group, so each process would subscribe to all the records published to the topic. + <p> + In addition, when group reassignment happens automatically, consumers can be notified through a <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a>, + which allows them to finish necessary application-level logic such as state cleanup, manual offset + commits, etc. See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details. + <p> + It is also possible for the consumer to <a href="#manualassignment">manually assign</a> specific partitions + (similar to the older "simple" consumer) using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign(Collection)</code></a>. In this case, dynamic partition + assignment and consumer group coordination will be disabled. + + <h3><a name="failuredetection">Detecting Consumer Failures</a></h3> + + After subscribing to a set of topics, the consumer will automatically join the group when <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a> is + invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer + will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, + the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for + a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will + be reassigned. + <p> + It is also possible that the consumer could encounter a "livelock" situation where it is continuing + to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions + indefinitely in this case, we provide a liveness detection mechanism using the <code>max.poll.interval.ms</code> + setting. Basically if you don't call poll at least as frequently as the configured max interval, + then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, + you may see an offset commit failure (as indicated by a <a href="../../../../../org/apache/kafka/clients/consumer/CommitFailedException.html" title="class in org.apache.kafka.clients.consumer"><code>CommitFailedException</code></a> thrown from a call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>commitSync()</code></a>). + This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. + So to stay in the group, you must continue to call poll. + <p> + The consumer provides two configuration settings to control the behavior of the poll loop: + <ol> + <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give + the consumer more time to handle a batch of records returned from <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a>. The drawback + is that increasing this value may delay a group rebalance since the consumer will only join the rebalance + inside the call to poll. You can use this setting to bound the time to finish a rebalance, but + you risk slower progress if the consumer cannot actually call <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll</code></a> often enough.</li> + <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single + call to poll. This can make it easier to predict the maximum that must be handled within each poll + interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the + impact of group rebalancing.</li> + </ol> + <p> + For use cases where message processing time varies unpredictably, neither of these options may be sufficient. + The recommended way to handle these cases is to move message processing to another thread, which allows + the consumer to continue calling <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll</code></a> while the processor is still working. Some care must be taken + to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic + commits and manually commit processed offsets for records only after the thread has finished handling them + (depending on the delivery semantics you need). Note also that you will need to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)"><code>pause</code></a> + the partition so that no new records are received from poll until after thread has finished handling those + previously returned. + + <h3>Usage Examples</h3> + The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to + demonstrate how to use them. + + <h4>Automatic Offset Committing</h4> + This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing. + <p> + <pre> + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("group.id", "test"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList("foo", "bar")); + while (true) { + ConsumerRecords<String, String> records = consumer.poll(100); + for (ConsumerRecord<String, String> record : records) + System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); + } + </pre> + + The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the + configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the + cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in + case there are servers down when the client is connecting). + <p> + Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by + the config <code>auto.commit.interval.ms</code>. + <p> + In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers + called <i>test</i> as configured with <code>group.id</code>. + <p> + The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we + are saying that our record's key and value will just be simple strings. + + <h4>Manual Offset Control</h4> + + Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records + should be considered as consumed and hence commit their offsets. This is useful when the consumption of the + are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing. + + <p> + <pre> + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("group.id", "test"); + props.put("enable.auto.commit", "false"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList("foo", "bar")); + final int minBatchSize = 200; + List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); + while (true) { + ConsumerRecords<String, String> records = consumer.poll(100); + for (ConsumerRecord<String, String> record : records) { + buffer.add(record); + } + if (buffer.size() >= minBatchSize) { + insertIntoDb(buffer); + consumer.commitSync(); + buffer.clear(); + } + } + </pre> + + In this example we will consume a batch of records and batch them up in memory. When we have enough records + batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records + would be considered consumed after they were returned to the user in <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll</code></a>. It would then be possible + for our process to fail after batching the records, but before they had been inserted into the database. + <p> + To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the + database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility: + the process could fail in the interval after the insert into the database but before the commit (even though this + would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption + would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way + Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one + time but in failure cases could be duplicated. + <p> + <b>Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that + you must consume all data returned from each call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a> before any subsequent calls, or before + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#close()"><code>closing</code></a> the consumer. If you fail to do either of these, it is possible for the committed offset + to get ahead of the consumed position, which results in missing records. The advantage of using manual offset + control is that you have direct control over when a record is considered "consumed."</b> + <p> + The above example uses <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>commitSync</code></a> to mark all received records as committed. In some cases + you may wish to have even finer control over which records have been committed by specifying an offset explicitly. + In the example below we commit offset after we finish handling the records in each partition. + <p> + <pre> + try { + while(running) { + ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); + for (TopicPartition partition : records.partitions()) { + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + System.out.println(record.offset() + ": " + record.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + } finally { + consumer.close(); + } + </pre> + + <b>Note: The committed offset should always be the offset of the next message that your application will read.</b> + Thus, when calling <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)"><code>commitSync(offsets)</code></a> you should add one to the offset of the last message processed. + + <h4><a name="manualassignment">Manual Partition Assignment</a></h4> + + In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a + fair share of the partitions for those topics based on the active consumers in the group. However, in + some cases you may need finer control over the specific partitions that are assigned. For example: + <p> + <ul> + <li>If the process is maintaining some kind of local state associated with that partition (like a + local on-disk key-value store), then it should only get records for the partition it is maintaining on disk. + <li>If the process itself is highly available and will be restarted if it fails (perhaps using a + cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In + this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process + will be restarted on another machine. + </ul> + <p> + To use this mode, instead of subscribing to the topic using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)"><code>subscribe</code></a>, you just call + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign(Collection)</code></a> with the full list of partitions that you want to consume. + + <pre> + String topic = "foo"; + TopicPartition partition0 = new TopicPartition(topic, 0); + TopicPartition partition1 = new TopicPartition(topic, 1); + consumer.assign(Arrays.asList(partition0, partition1)); + </pre> + + Once assigned, you can call <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll</code></a> in a loop, just as in the preceding examples to consume + records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions + will only change with another call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign</code></a>. Manual partition assignment does + not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer + acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should + usually ensure that the groupId is unique for each consumer instance. + <p> + Note that it isn't possible to mix manual partition assignment (i.e. using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign</code></a>) + with dynamic partition assignment through topic subscription (i.e. using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)"><code>subscribe</code></a>). + + <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4> + + The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own + choosing. The primary use case for this is allowing the application to store both the offset and the results of the + consumption in the same system in a way that both the results and offsets are stored atomically. This is not always + possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are + stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality. + <p> + Here are a couple of examples of this type of usage: + <ul> + <li>If the results of the consumption are being stored in a relational database, storing the offset in the database + as well can allow committing both the results and offset in a single transaction. Thus either the transaction will + succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset + won't be updated. + <li>If the results are being stored in a local store it may be possible to store the offset there as well. For + example a search index could be built by subscribing to a particular partition and storing both the offset and the + indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even + if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well. + This means that in this case the indexing process that comes back having lost recent updates just resumes indexing + from what it has ensuring that no updates are lost. + </ul> + <p> + Each record comes with its own offset, so to manage your own offset you just need to do the following: + + <ul> + <li>Configure <code>enable.auto.commit=false</code> + <li>Use the offset provided with each <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecord.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerRecord</code></a> to save your position. + <li>On restart restore the position of the consumer using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)"><code>seek(TopicPartition, long)</code></a>. + </ul> + + <p> + This type of usage is simplest when the partition assignment is also done manually (this would be likely in the + search index use case described above). If the partition assignment is done automatically special care is + needed to handle the case where partition assignments change. This can be done by providing a + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> instance in the call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Collection, ConsumerRebalanceListener)</code></a> + and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Pattern, ConsumerRebalanceListener)</code></a>. + For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by + implementing <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)"><code>ConsumerRebalanceListener.onPartitionsRevoked(Collection)</code></a>. When partitions are assigned to a + consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer + to that position by implementing <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned(java.util.Collection)"><code>ConsumerRebalanceListener.onPartitionsAssigned(Collection)</code></a>. + <p> + Another common use for <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> is to flush any caches the application maintains for + partitions that are moved elsewhere. + + <h4>Controlling The Consumer's Position</h4> + + In most use cases the consumer will simply consume records from beginning to end, periodically committing its + position (either automatically or manually). However Kafka allows the consumer to manually control its position, + moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to + the most recent records without actually consuming the intermediate records. + <p> + There are several instances where manually controlling the consumer's position can be useful. + <p> + One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not + attempt to catch up processing all records, but rather just skip to the most recent records. + <p> + Another use case is for a system that maintains local state as described in the previous section. In such a system + the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise + if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by + re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). + <p> + Kafka allows specifying the position using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)"><code>seek(TopicPartition, long)</code></a> to specify the new position. Special + methods for seeking to the earliest and latest offset the server maintains are also available ( + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)"><code>seekToBeginning(Collection)</code></a> and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection)"><code>seekToEnd(Collection)</code></a> respectively). + + <h4>Consumption Flow Control</h4> + + If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, + effectively giving these partitions the same priority for consumption. However in some cases consumers may want to + first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions + when these partitions have few or no data to consume. + + <p> + One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. + When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic + in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are + a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider + fetching other topics. + + <p> + Kafka supports dynamic controlling of consumption flows by using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)"><code>pause(Collection)</code></a> and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(java.util.Collection)"><code>resume(Collection)</code></a> + to pause the consumption on the specified assigned partitions and resume the consumption + on the specified paused partitions respectively in the future <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a> calls. + + <h3><a name="multithreaded">Multi-threaded Processing</a></h3> + + The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application + making the call. It is the responsibility of the user to ensure that multi-threaded access + is properly synchronized. Un-synchronized access will result in <a href="http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html?is-external=true" title="class or interface in java.util"><code>ConcurrentModificationException</code></a>. + + <p> + The only exception to this rule is <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup()"><code>wakeup()</code></a>, which can safely be used from an external thread to + interrupt an active operation. In this case, a <a href="../../../../../org/apache/kafka/common/errors/WakeupException.html" title="class in org.apache.kafka.common.errors"><code>WakeupException</code></a> will be + thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. + The following snippet shows the typical pattern: + + <pre> + public class KafkaConsumerRunner implements Runnable { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final KafkaConsumer consumer; + + public void run() { + try { + consumer.subscribe(Arrays.asList("topic")); + while (!closed.get()) { + ConsumerRecords records = consumer.poll(10000); + // Handle new records + } + } catch (WakeupException e) { + // Ignore exception if closing + if (!closed.get()) throw e; + } finally { + consumer.close(); + } + } + + // Shutdown hook which can be called from a separate thread + public void shutdown() { + closed.set(true); + consumer.wakeup(); + } + } + </pre> + + Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer. + + <p> + <pre> + closed.set(true); + consumer.wakeup(); + </pre> + + <p> + We have intentionally avoided implementing a particular threading model for processing. This leaves several + options for implementing multi-threaded processing of records. + + <h4>1. One Consumer Per Thread</h4> + + A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach: + <ul> + <li><b>PRO</b>: It is the easiest to implement + <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed + <li><b>PRO</b>: It makes in-order processing on a per-partition basis very easy to implement (each thread just + processes messages in the order it receives them). + <li><b>CON</b>: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles + connections very efficiently so this is generally a small cost. + <li><b>CON</b>: Multiple consumers means more requests being sent to the server and slightly less batching of data + which can cause some drop in I/O throughput. + <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions. + </ul> + + <h4>2. Decouple Consumption and Processing</h4> + + Another alternative is to have one or more consumer threads that do all data consumption and hands off + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerRecords</code></a> instances to a blocking queue consumed by a pool of processor threads that actually handle + the record processing. + + This option likewise has pros and cons: + <ul> + <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it + possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions. + <li><b>CON</b>: Guaranteeing order across the processors requires particular care as the threads will execute + independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of + thread execution timing. For processing that has no ordering requirements this is not a problem. + <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure + that processing is complete for that partition. + </ul> + + There are many possible variations on this approach. For example each processor thread can have its own queue, and + the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify + commit.</div> +</li> +</ul> +</div> +<div class="summary"> +<ul class="blockList"> +<li class="blockList"> +<!-- ======== CONSTRUCTOR SUMMARY ======== --> +<ul class="blockList"> +<li class="blockList"><a name="constructor_summary"> +<!-- --> +</a> +<h3>Constructor Summary</h3> +<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Constructor Summary table, listing constructors, and an explanation"> +<caption><span>Constructors</span><span class="tabEnd"> </span></caption> +<tr> +<th class="colOne" scope="col">Constructor and Description</th> +</tr> +<tr class="altColor"> +<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Map)">KafkaConsumer</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a>> configs)</code> +<div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Map,%20org.apache.kafka.common.serialization.Deserializer,%20org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a>> configs, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>> keyDeserializer, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>> valueDeserializer)</code> +<div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Properties)">KafkaConsumer</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util">Properties</a> properties)</code> +<div class="block">A consumer is instantiated by providing a <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util"><code>Properties</code></a> object as configuration.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Properties,%20org.apache.kafka.common.serialization.Deserializer,%20org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util">Properties</a> properties, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>> keyDeserializer, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>> valueDeserializer)</code> +<div class="block">A consumer is instantiated by providing a <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util"><code>Properties</code></a> object as configuration, and a + key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div> +</td> +</tr> +</table> +</li> +</ul> +<!-- ========== METHOD SUMMARY =========== --> +<ul class="blockList"> +<li class="blockList"><a name="method_summary"> +<!-- --> +</a> +<h3>Method Summary</h3> +<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Method Summary table, listing methods, and an explanation"> +<caption><span>Methods</span><span class="tabEnd"> </span></caption> +<tr> +<th class="colFirst" scope="col">Modifier and Type</th> +<th class="colLast" scope="col">Method and Description</th> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)">assign</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Manually assign a list of partition to this consumer.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Set.html?is-external=true" title="class or interface in java.util">Set</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assignment()">assignment</a></strong>()</code> +<div class="block">Get the set of partitions currently assigned to this consumer.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Long.html?is-external=true" title="class or interface in java.lang">Long</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#beginningOffsets(java.util.Collection)">beginningOffsets</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Get the first offset for the given partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#close()">close</a></strong>()</code> +<div class="block">Close the consumer, waiting indefinitely for any needed cleanup.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync()">commitAsync</a></strong>()</code> +<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for all the subscribed list of topics and partition.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)">commitAsync</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a>> offsets, + <a href="../../../../../org/apache/kafka/clients/consumer/OffsetCommitCallback.html" title="interface in org.apache.kafka.clients.consumer">OffsetCommitCallback</a> callback)</code> +<div class="block">Commit the specified offsets for the specified list of topics and partitions to Kafka.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback)">commitAsync</a></strong>(<a href="../../../../../org/apache/kafka/clients/consumer/OffsetCommitCallback.html" title="interface in org.apache.kafka.clients.consumer">OffsetCommitCallback</a> callback)</code> +<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for the subscribed list of topics and partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()">commitSync</a></strong>()</code> +<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for all the subscribed list of topics and partitions.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)">commitSync</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a>> offsets)</code> +<div class="block">Commit the specified offsets for the specified list of topics and partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code><a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#committed(org.apache.kafka.common.TopicPartition)">committed</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a> partition)</code> +<div class="block">Get the last committed offset for the given partition (whether the commit happened by this process or + another).</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Long.html?is-external=true" title="class or interface in java.lang">Long</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)">endOffsets</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Get the last offset for the given partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true" title="class or interface in java.util">List</a><<a href="../../../../../org/apache/kafka/common/PartitionInfo.html" title="class in org.apache.kafka.common">PartitionInfo</a>>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics()">listTopics</a></strong>()</code> +<div class="block">Get metadata about partitions for all topics that the user is authorized to view.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/MetricName.html" title="class in org.apache.kafka.common">MetricName</a>,? extends <a href="../../../../../org/apache/kafka/common/Metric.html" title="interface in org.apache.kafka.common">Metric</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#metrics()">metrics</a></strong>()</code> +<div class="block">Get the metrics kept by the consumer</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndTimestamp.html" title="class in org.apache.kafka.clients.consumer">OffsetAndTimestamp</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map)">offsetsForTimes</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Long.html?is-external=true" title="class or interface in java.lang">Long</a>> timestampsToSearch)</code> +<div class="block">Look up the offsets for the given partitions by timestamp.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true" title="class or interface in java.util">List</a><<a href="../../../../../org/apache/kafka/common/PartitionInfo.html" title="class in org.apache.kafka.common">PartitionInfo</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)">partitionsFor</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a> topic)</code> +<div class="block">Get metadata about the partitions for a given topic.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)">pause</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Suspend fetching from the requested partitions.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Set.html?is-external=true" title="class or interface in java.util">Set</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#paused()">paused</a></strong>()</code> +<div class="block">Get the set of partitions that were previously paused by a call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)"><code>pause(Collection)</code></a>.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code><a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer">ConsumerRecords</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">poll</a></strong>(long timeout)</code> +<div class="block">Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>long</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)">position</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a> partition)</code> +<div class="block">Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(java.util.Collection)">resume</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Resume specified partitions which have been paused with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)"><code>pause(Collection)</code></a>.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)">seek</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a> partition, + long offset)</code> +<div class="block">Overrides the fetch offsets that the consumer will use on the next <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(timeout)</code></a>.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)">seekToBeginning</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Seek to the first offset for each of the given partitions.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(java.util.Collection)">seekToEnd</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> partitions)</code> +<div class="block">Seek to the last offset for each of the given partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)">subscribe</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>> topics)</code> +<div class="block">Subscribe to the given list of topics to get dynamically assigned partitions.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>> topics, + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a> listener)</code> +<div class="block">Subscribe to the given list of topics to get dynamically + assigned partitions.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></strong>(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html?is-external=true" title="class or interface in java.util.regex">Pattern</a> pattern, + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a> listener)</code> +<div class="block">Subscribe to all topics matching specified pattern to get dynamically assigned partitions.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code><a href="http://docs.oracle.com/javase/7/docs/api/java/util/Set.html?is-external=true" title="class or interface in java.util">Set</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>></code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscription()">subscription</a></strong>()</code> +<div class="block">Get the current subscription.</div> +</td> +</tr> +<tr class="rowColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()">unsubscribe</a></strong>()</code> +<div class="block">Unsubscribe from topics currently subscribed with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)"><code>subscribe(Collection)</code></a>.</div> +</td> +</tr> +<tr class="altColor"> +<td class="colFirst"><code>void</code></td> +<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup()">wakeup</a></strong>()</code> +<div class="block">Wakeup the consumer.</div> +</td> +</tr> +</table> +<ul class="blockList"> +<li class="blockList"><a name="methods_inherited_from_class_java.lang.Object"> +<!-- --> +</a> +<h3>Methods inherited from class java.lang.<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a></h3> +<code><a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#clone()" title="class or interface in java.lang">clone</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#equals(java.lang.Object)" title="class or interface in java.lang">equals</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#finalize()" title="class or interface in java.lang">finalize</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#getClass()" title="class or interface in java.lang">getClass</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#hashCode()" title="class or interface in java.lang">hashCode</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#notify()" title="class or interface in java.lang">notify</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang /Object.html?is-external=true#notifyAll()" title="class or interface in java.lang">notifyAll</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#toString()" title="class or interface in java.lang">toString</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#wait()" title="class or interface in java.lang">wait</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#wait(long)" title="class or interface in java.lang">wait</a>, <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true#wait(long,%20int)" title="class or interface in java.lang">wait</a></code></li> +</ul> +</li> +</ul> +</li> +</ul> +</div> +<div class="details"> +<ul class="blockList"> +<li class="blockList"> +<!-- ========= CONSTRUCTOR DETAIL ======== --> +<ul class="blockList"> +<li class="blockList"><a name="constructor_detail"> +<!-- --> +</a> +<h3>Constructor Detail</h3> +<a name="KafkaConsumer(java.util.Map)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>KafkaConsumer</h4> +<pre>public KafkaConsumer(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a>> configs)</pre> +<div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings + are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be + either strings or objects of the appropriate type (for example a numeric configuration would accept either the + string "42" or the integer 42). + <p> + Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div> +<dl><dt><span class="strong">Parameters:</span></dt><dd><code>configs</code> - The consumer configs</dd></dl> +</li> +</ul> +<a name="KafkaConsumer(java.util.Map, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>KafkaConsumer</h4> +<pre>public KafkaConsumer(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true" title="class or interface in java.util">Map</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>,<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html?is-external=true" title="class or interface in java.lang">Object</a>> configs, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>> keyDeserializer, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>> valueDeserializer)</pre> +<div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. + <p> + Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div> +<dl><dt><span class="strong">Parameters:</span></dt><dd><code>configs</code> - The consumer configs</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method + won't be called in the consumer when the deserializer is passed in directly.</dd><dd><code>valueDeserializer</code> - The deserializer for value that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method + won't be called in the consumer when the deserializer is passed in directly.</dd></dl> +</li> +</ul> +<a name="KafkaConsumer(java.util.Properties)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>KafkaConsumer</h4> +<pre>public KafkaConsumer(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util">Properties</a> properties)</pre> +<div class="block">A consumer is instantiated by providing a <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util"><code>Properties</code></a> object as configuration. + <p> + Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div> +<dl><dt><span class="strong">Parameters:</span></dt><dd><code>properties</code> - The consumer configuration properties</dd></dl> +</li> +</ul> +<a name="KafkaConsumer(java.util.Properties, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)"> +<!-- --> +</a> +<ul class="blockListLast"> +<li class="blockList"> +<h4>KafkaConsumer</h4> +<pre>public KafkaConsumer(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util">Properties</a> properties, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>> keyDeserializer, + <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>> valueDeserializer)</pre> +<div class="block">A consumer is instantiated by providing a <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Properties.html?is-external=true" title="class or interface in java.util"><code>Properties</code></a> object as configuration, and a + key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. + <p> + Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div> +<dl><dt><span class="strong">Parameters:</span></dt><dd><code>properties</code> - The consumer configuration properties</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method + won't be called in the consumer when the deserializer is passed in directly.</dd><dd><code>valueDeserializer</code> - The deserializer for value that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method + won't be called in the consumer when the deserializer is passed in directly.</dd></dl> +</li> +</ul> +</li> +</ul> +<!-- ============ METHOD DETAIL ========== --> +<ul class="blockList"> +<li class="blockList"><a name="method_detail"> +<!-- --> +</a> +<h3>Method Detail</h3> +<a name="assignment()"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>assignment</h4> +<pre>public <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Set.html?is-external=true" title="class or interface in java.util">Set</a><<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>> assignment()</pre> +<div class="block">Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning + partitions using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign(Collection)</code></a> then this will simply return the same partitions that + were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned + to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the + process of getting reassigned).</div> +<dl> +<dt><strong>Specified by:</strong></dt> +<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#assignment()">assignment</a></code> in interface <code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></dd> +<dt><span class="strong">Returns:</span></dt><dd>The set of partitions currently assigned to this consumer</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assignment()"><code>assignment()</code></a></dd></dl> +</li> +</ul> +<a name="subscription()"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>subscription</h4> +<pre>public <a href="http://docs.oracle.com/javase/7/docs/api/java/util/Set.html?is-external=true" title="class or interface in java.util">Set</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>> subscription()</pre> +<div class="block">Get the current subscription. Will return the same topics used in the most recent call to + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Collection, ConsumerRebalanceListener)</code></a>, or an empty set if no such call has been made.</div> +<dl> +<dt><strong>Specified by:</strong></dt> +<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscription()">subscription</a></code> in interface <code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></dd> +<dt><span class="strong">Returns:</span></dt><dd>The set of topics currently subscribed to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscription()"><code>subscription()</code></a></dd></dl> +</li> +</ul> +<a name="subscribe(java.util.Collection, org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>subscribe</h4> +<pre>public void subscribe(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>> topics, + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a> listener)</pre> +<div class="block">Subscribe to the given list of topics to get dynamically + assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current + assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management + with manual partition assignment through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign(Collection)</code></a>. + + If the given list of topics is empty, it is treated the same as <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()"><code>unsubscribe()</code></a>. + + <p> + As part of group management, the consumer will keep track of the list of consumers that belong to a particular + group and will trigger a rebalance operation if one of the following events trigger - + <ul> + <li>Number of partitions change for any of the subscribed list of topics + <li>Topic is created or deleted + <li>An existing member of the consumer group dies + <li>A new member is added to an existing consumer group via the join API + </ul> + <p> + When any of these events are triggered, the provided listener will be invoked first to indicate that + the consumer's assignment has been revoked, and then again when the new assignment has been received. + Note that this listener will immediately override any listener set in a previous call to subscribe. + It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics + subscribed in this call. See <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> for more details.</div> +<dl> +<dt><strong>Specified by:</strong></dt> +<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></code> in interface <code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></dd> +<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - The list of topics to subscribe to</dd><dd><code>listener</code> - Non-null listener instance to get notifications on partition assignment/revocation for the + subscribed topics</dd> +<dt><span class="strong">Throws:</span></dt> +<dd><code><a href="http://docs.oracle.com/javase/7/docs/api/java/lang/IllegalArgumentException.html?is-external=true" title="class or interface in java.lang">IllegalArgumentException</a></code> - If topics is null or contains null or empty elements</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Collection, ConsumerRebalanceListener)</code></a></dd></dl> +</li> +</ul> +<a name="subscribe(java.util.Collection)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>subscribe</h4> +<pre>public void subscribe(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/Collection.html?is-external=true" title="class or interface in java.util">Collection</a><<a href="http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true" title="class or interface in java.lang">String</a>> topics)</pre> +<div class="block">Subscribe to the given list of topics to get dynamically assigned partitions. + <b>Topic subscriptions are not incremental. This list will replace the current + assignment (if there is one).</b> It is not possible to combine topic subscription with group management + with manual partition assignment through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)"><code>assign(Collection)</code></a>. + + If the given list of topics is empty, it is treated the same as <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()"><code>unsubscribe()</code></a>. + + <p> + This is a short-hand for <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Collection, ConsumerRebalanceListener)</code></a>, which + uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer + <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Collection, ConsumerRebalanceListener)</code></a>, since group rebalances will cause partition offsets + to be reset. You should also prefer to provide your own listener if you are doing your own offset + management since the listener gives you an opportunity to commit offsets before a rebalance finishes.</div> +<dl> +<dt><strong>Specified by:</strong></dt> +<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.Collection)">subscribe</a></code> in interface <code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></dd> +<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - The list of topics to subscribe to</dd> +<dt><span class="strong">Throws:</span></dt> +<dd><code><a href="http://docs.oracle.com/javase/7/docs/api/java/lang/IllegalArgumentException.html?is-external=true" title="class or interface in java.lang">IllegalArgumentException</a></code> - If topics is null or contains null or empty elements</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection)"><code>subscribe(Collection)</code></a></dd></dl> +</li> +</ul> +<a name="subscribe(java.util.regex.Pattern, org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>subscribe</h4> +<pre>public void subscribe(<a href="http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html?is-external=true" title="class or interface in java.util.regex">Pattern</a> pattern, + <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a> listener)</pre> +<div class="block">Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics + existing at the time of check. + + <p> + As part of group management, the consumer will keep track of the list of consumers that + belong to a particular group and will trigger a rebalance operation if one of the + following events trigger - + <ul> + <li>Number of partitions change for any of the subscribed list of topics + <li>Topic is created or deleted + <li>An existing member of the consumer group dies + <li>A new member is added to an existing consumer group via the join API + </ul></div> +<dl> +<dt><strong>Specified by:</strong></dt> +<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></code> in interface <code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a><<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>></code></dd> +<dt><span class="strong">Parameters:</span></dt><dd><code>pattern</code> - Pattern to subscribe to</dd><dd><code>listener</code> - Non-null listener instance to get notifications on partition assignment/revocation for the + subscribed topics</dd> +<dt><span class="strong">Throws:</span></dt> +<dd><code><a href="http://docs.oracle.com/javase/7/docs/api/java/lang/IllegalArgumentException.html?is-external=true" title="class or interface in java.lang">IllegalArgumentException</a></code> - If pattern is null</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Pattern, ConsumerRebalanceListener)</code></a></dd></dl> +</li> +</ul> +<a name="unsubscribe()"> +<!-- --> +</a> +<ul class="blockList"> +<li class="blockList"> +<h4>unsubscribe</h4> +<pre>public void unsubscribe()</pre> +<div class="block">Unsubscribe from topics currently subscribed
<TRUNCATED>