> On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/Consumer.java, line 41 > > <https://reviews.apache.org/r/19731/diff/1/?file=538460#file538460line41> > > > > Just wanted to confirm again that we agree to expose TopicPartition to > > users? Though it is already in common package, it currently only used > > internally.
Right. After writing down some examples using the APIs, I'm convinced that we need to expose TopicPartition. Take a look at the examples to see if you can think of a better solution. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/Consumer.java, line 45 > > <https://reviews.apache.org/r/19731/diff/1/?file=538460#file538460line45> > > > > Does this mean subscribe(TopicPartition) followed by an > > unsubscribe(String) will also throw an error? That's correct. I think it says that it only works in conjunction with subscribe(topics), so yes that will throw some exception. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/Consumer.java, line 54 > > <https://reviews.apache.org/r/19731/diff/1/?file=538460#file538460line54> > > > > Ditto as above. Here is what it says - It is an error to unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)} > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/Consumer.java, line 77 > > <https://reviews.apache.org/r/19731/diff/1/?file=538460#file538460line77> > > > > "If no offsets are specified", how does this mean as map values? Or > > should we say sth like "If the specified offset is negative.." That comment was stale given that we also have commit() and commitAsync(). Removed it. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/Consumer.java, line 92 > > <https://reviews.apache.org/r/19731/diff/1/?file=538460#file538460line92> > > > > Not sure if we have discussed details about how to implement async > > commit in the new consumer? It just means the consumer will not wait for the OffsetCommitResponse before returning from commitAsync(). We can discuss details on the wiki, but looks like we have to add this API. I can imagine several applications that would not want to commit synchronously. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java, line 55 > > <https://reviews.apache.org/r/19731/diff/1/?file=538461#file538461line55> > > > > ENABLE_AUTO_COMMIT_CONFIG to be consistent with ENABLE_JMX_CONFIG Well, I removed it and added the corresponding metrics reporter configs to stay consistent with the producer. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java, line 112 > > <https://reviews.apache.org/r/19731/diff/1/?file=538461#file538461line112> > > > > fetch.buffer.bytes I think we might not need this config, but we can revisit when we discuss the memory management > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/ConsumerRecord.java, line 22 > > <https://reviews.apache.org/r/19731/diff/1/?file=538463#file538463line22> > > > > Suggest add the following functions: > > > > compressionType() I'm not so sure that it will make sense. The consumer record is always the original raw record and is always handed out after the decompression anyways. Thoughts? > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/common/TopicPartitionOffset.java, line 13 > > <https://reviews.apache.org/r/19731/diff/1/?file=538466#file538466line13> > > > > This class is currently only used in user customized callback function. > > So do we really need to provide this class? Nope. After the latest API changes I made, this class can be deleted. > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/KafkaConsumer.java, line 337 > > <https://reviews.apache.org/r/19731/diff/1/?file=538464#file538464line337> > > > > Two constructors KafkaConsumer(ConsumerConfig config) and > > KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) > > missing. The latter is there, I added the former > On April 7, 2014, 6:03 p.m., Guozhang Wang wrote: > > clients/src/main/java/kafka/clients/consumer/ConsumerRebalanceCallback.java, > > lines 26-47 > > <https://reviews.apache.org/r/19731/diff/1/?file=538462#file538462line26> > > > > Have we considered the following case: in a poll() function the > > consumer realized a rebalance is triggered, and hence call > > onPartitionRevoked and onPartitionAssigned, and then poll() times out, the > > user then call commit(partitions) on the old partitions. That's a dangerous use of commit(), I think we discussed that the co-ordinator will reject requests to commit offsets for partitions that the consumer id doesn't own. I think the API docs can be updated to specify that. - Neha ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19731/#review39700 ----------------------------------------------------------- On March 27, 2014, 4:16 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19731/ > ----------------------------------------------------------- > > (Updated March 27, 2014, 4:16 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1328 > https://issues.apache.org/jira/browse/KAFKA-1328 > > > Repository: kafka > > > Description > ------- > > Added license headers > > > Cleaned javadoc for ConsumerConfig > > > Fixed minor indentation in ConsumerConfig > > > Improve docs on ConsumerConfig > > > 1. Added ClientUtils 2. Added basic constructor implementation for > KafkaConsumer > > > Improved MockConsumer > > > Chris's feedback and also consumer rewind example code > > > Added commit() and commitAsync() APIs to the consumer and updated docs and > examples to reflect that > > > 1. Added consumer usage examples to javadoc 2. Changed signature of APIs that > accept or return offsets from list of offsets to map of offsets > > > Improved example for using ConsumerRebalanceCallback > > > Improved example for using ConsumerRebalanceCallback > > > Included Jun's review comments and renamed positions to seek. Also included > position() > > > Changes to javadoc for positions() > > > Changed the javadoc for ConsumerRebalanceCallback > > > Changing unsubscribe to also take in var args for topic list > > > Incorporated first round of feedback from Jay, Pradeep and Mattijs on the > mailing list > > > Updated configs > > > Javadoc for consumer complete > > > Completed docs for Consumer and ConsumerRebalanceCallback. Added MockConsumer > > > Added the initial interfaces and related documentation for the consumer. More > docs required to complete the public API > > > Diffs > ----- > > clients/src/main/java/kafka/clients/consumer/Consumer.java PRE-CREATION > clients/src/main/java/kafka/clients/consumer/ConsumerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/consumer/ConsumerRebalanceCallback.java > PRE-CREATION > clients/src/main/java/kafka/clients/consumer/ConsumerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/consumer/KafkaConsumer.java > PRE-CREATION > clients/src/main/java/kafka/clients/consumer/MockConsumer.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartitionOffset.java PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ff9174870a8c9cd97eb6655416edd4124377b0e > clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/19731/diff/ > > > Testing > ------- > > > Thanks, > > Neha Narkhede > >