> On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java, > > lines 37-38 > > <https://reviews.apache.org/r/19731/diff/7-8/?file=555628#file555628line37> > > > > partitionId can probably be just partition to be consistent with what's > > in ProducerRecord.
It's probably going to require a change on the producer. See my comment on this rb previously - "The returned object from partition() is TopicPartition on purpose. I realized that returning partition id from this API is useless since all other APIs in the consumer accept TopicPartition. The constructor parameter can be renamed to partitionId" > On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, > > lines 98-104 > > <https://reviews.apache.org/r/19731/diff/9/?file=574802#file574802line98> > > > > If we can't think of a usage of this api, perhaps we should just remove > > it. The typically usage is that we want to seek to a previously committed > > offset. However, knowing the current fetch offset is of little use. Please refer to the mailing list discussion on the requirement for this API. The subject was something like "New consumer API discussion". > On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java, > > lines 108-109 > > <https://reviews.apache.org/r/19731/diff/9/?file=574805#file574805line108> > > > > The issue with not exposing a nextOffset() api is that users have to > > figure out the next offset themselves, which is not natural. The common > > usage is the app finishes consuming a record and want to commit the next > > offset (not the current offset) after the consumed record. Having a > > nextOffset() will allow us to explain this to the user better in the api. Let's discuss this more explicitly on the mailing list. I'm not opposed to exposing the API if most people feel the need for it. Make sure you give this feedback on the API discussion thread as well. I can make this change after the initial patch is in. > On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 53-63 > > <https://reviews.apache.org/r/19731/diff/9/?file=574807#file574807line53> > > > > The returned offset should be the next offset. See the comment on > > exposing nextOffset(). Not really. processedOffsets here stores offsets of records for which the consumer has finished processing. So record.offset() seems correct right? > On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java, > > lines 74-86 > > <https://reviews.apache.org/r/19731/diff/9/?file=574805#file574805line74> > > > > Perhaps we just need to combine the two into one api > > topicAndPartition(). We could definitely add a topicAndPartition() API. > On May 7, 2014, 4:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, > > lines 87-88 > > <https://reviews.apache.org/r/19731/diff/9/?file=574802#file574802line87> > > > > The Future thing doesn't work well in this case. This is because the > > caller thread is also the one that does the polling. If the caller calls > > future.get, it will block forever since there won't be any polling so that > > we can get the response. So, we will likely have to make a separate > > blocking api. Hmm.. even if we expose it as a separate API, it seems the problem you mentioned will not go away. Probably adding a callback is a better approach. - Neha ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/19731/#review42405 ----------------------------------------------------------- On May 5, 2014, 6:35 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/19731/ > ----------------------------------------------------------- > > (Updated May 5, 2014, 6:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1328 > https://issues.apache.org/jira/browse/KAFKA-1328 > > > Repository: kafka > > > Description > ------- > > Review comments from Jun and Guozhang > > > Checked in ConsumerRecordMetadata > > > Fixed the javadoc usage examples in KafkaConsumer to match the API changes > > > Changed the signature of poll to return Map<String,ConsumerRecordMetadata> to > organize the ConsumerRecords around topic and then optionally around > partition. This will serve the group management as well as custom partition > subscription use cases > > > 1. Changed the signature of poll() to return Map<String, > List<ConsumerRecord>> 2. Changed ConsumerRecord to throw an exception if an > error is detected for the partition. For example, if a single large message > is larger than the total memory just for that partition, we don't want poll() > to throw an exception since that will affect the processing of the remaining > partitions as well > > > Fixed MockConsumer to make subscribe(topics) and subscribe(partitions) > mutually exclusive > > > Changed the package to org.apache.kafka.clients.consumer from > kafka.clients.consumer > > > Changed the package to org.apache.kafka.clients.consumer from > kafka.clients.consumer > > > 1. Removed the commitAsync() APIs 2. Changed the commit() APIs to return a > Future > > > Fixed configs to match the producer side configs for metrics > > > Renamed AUTO_COMMIT_ENABLE_CONFIG to ENABLE_AUTO_COMMIT_CONFIG > > > Addressing review comments from Tim and Guozhang > > > Rebasing after producer side config cleanup > > > Added license headers > > > Cleaned javadoc for ConsumerConfig > > > Fixed minor indentation in ConsumerConfig > > > Improve docs on ConsumerConfig > > > 1. Added ClientUtils 2. Added basic constructor implementation for > KafkaConsumer > > > Improved MockConsumer > > > Chris's feedback and also consumer rewind example code > > > Added commit() and commitAsync() APIs to the consumer and updated docs and > examples to reflect that > > > 1. Added consumer usage examples to javadoc 2. Changed signature of APIs that > accept or return offsets from list of offsets to map of offsets > > > Improved example for using ConsumerRebalanceCallback > > > Improved example for using ConsumerRebalanceCallback > > > Included Jun's review comments and renamed positions to seek. Also included > position() > > > Changes to javadoc for positions() > > > Changed the javadoc for ConsumerRebalanceCallback > > > Changing unsubscribe to also take in var args for topic list > > > Incorporated first round of feedback from Jay, Pradeep and Mattijs on the > mailing list > > > Updated configs > > > Javadoc for consumer complete > > > Completed docs for Consumer and ConsumerRebalanceCallback. Added MockConsumer > > > Added the initial interfaces and related documentation for the consumer. More > docs required to complete the public API > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecordMetadata.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > a6423f4b37a57f0290e2048b764de1218470f4f7 > clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/19731/diff/ > > > Testing > ------- > > > Thanks, > > Neha Narkhede > >