-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19731/#review40856
-----------------------------------------------------------


1. I am not sure about the use of ellipsis. This may make passing a set of 
items from a collection to the api a bit harder. Suppose that you have a list 
of topics stored in

ArrayList<String> topics;

If you want subscribe to all topics in one call, you will have to do: 

String[] topicArray = new String[topics.size()];
consumer.subscribe(topics.toArray(topicArray));

See the comment on the example below. Also, passing in an ellipsis and getting 
back a map is a bit weird. Passing in ellipsis doesn't make it obvious that 
partitions from the input should be unique. It probably would be more natural 
if we pass in a Set instead.



clients/src/main/java/kafka/common/TopicPartitionOffset.java
<https://reviews.apache.org/r/19731/#comment75074>

    Apache license header.



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75057>

    What's the error when unsubscribing a wrong topic/partition? Do we throw an 
uncaught exception?



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75058>

    Some apps want to collate by topic and some others may want to collate by 
partition. It seems it's simpler to just return a list of ConsumerRecord. 
Another alternative is to return a <partition, record> map which seems to match 
the rest of the apis better.



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment74106>

    Each poll may not return all subscribed partitions. So, when calling 
commit(), do we expect to commit the offset of the last record for each 
subscribed partition returned by all previous polls?



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75064>

    Since we have a committed api, the return offset value in commit() seems 
redundant. Perhaps it's cleaner to just return Future<Map<TopicPartition, 
errorCode>>?



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75062>

    We should make it clear that calling get() on the future will block until 
the commit request completes.



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75063>

    Should we make this a batch api to match seek()?



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/19731/#comment75072>

    It's probably reasonable to reset the offset based on an arbitrary 
timestamp using the getOffsetBefore() api. The granularity is coarse right now, 
but can be improved in the future.



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/19731/#comment75073>

    Do we need this and metadata.fetch.backoff.ms?



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
<https://reviews.apache.org/r/19731/#comment75060>

    Should we also have an api for nextOffset()?



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
<https://reviews.apache.org/r/19731/#comment75059>

    It seems it's more natural to return an error code than an exception.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75071>

    The description on threading is a bit confusing. KafkaConsumer doesn't own 
any thread. All calls are done in the caller thread. It seems the easiest way 
to use the api is to call KafkaConsumer api from a single thread and pass the 
consumed records to a separate thread pool for parallel consumption if needed.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75066>

    commitAsync() no longer exists.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75067>

    The most common use case would be that the app wants auto rebalancing, but 
wants to commit offsets manually. So, we need to call commit() on 
partitionRevoked() and call seek with the committed offset in 
partitionAssigned(). This is actually the default behavior of the 
ConsumerRebalanceCallback. So, I am not sure if this example covers a common 
use case.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75068>

    See the comment on ellipsis. Passing in partitions as ellipsis is a bit 
tricky.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75069>

    We should commit the offset before close.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75070>

    We should commit the offset before close.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75065>

    It's not clear to me how position() will be used. Could we add an example?



clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
<https://reviews.apache.org/r/19731/#comment75061>

    It seems it's more natural to return an error code than an exception.
    


- Jun Rao


On April 13, 2014, 2:12 a.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19731/
> -----------------------------------------------------------
> 
> (Updated April 13, 2014, 2:12 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1328
>     https://issues.apache.org/jira/browse/KAFKA-1328
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixed the javadoc usage examples in KafkaConsumer to match the API changes
> 
> 
> Changed the signature of poll to return Map<String,ConsumerRecordMetadata> to 
> organize the ConsumerRecords around topic and then optionally around 
> partition. This will serve the group management as well as custom partition 
> subscription use cases
> 
> 
> 1. Changed the signature of poll() to return Map<String, 
> List<ConsumerRecord>> 2. Changed ConsumerRecord to throw an exception if an 
> error is detected for the partition. For example, if a single large message 
> is larger than the total memory just for that partition, we don't want poll() 
> to throw an exception since that will affect the processing of the remaining 
> partitions as well
> 
> 
> Fixed MockConsumer to make subscribe(topics) and subscribe(partitions) 
> mutually exclusive
> 
> 
> Changed the package to org.apache.kafka.clients.consumer from 
> kafka.clients.consumer
> 
> 
> Changed the package to org.apache.kafka.clients.consumer from 
> kafka.clients.consumer
> 
> 
> 1. Removed the commitAsync() APIs 2. Changed the commit() APIs to return a 
> Future
> 
> 
> Fixed configs to match the producer side configs for metrics
> 
> 
> Renamed AUTO_COMMIT_ENABLE_CONFIG to ENABLE_AUTO_COMMIT_CONFIG
> 
> 
> Addressing review comments from Tim and Guozhang
> 
> 
> Rebasing after producer side config cleanup
> 
> 
> Added license headers
> 
> 
> Cleaned javadoc for ConsumerConfig
> 
> 
> Fixed minor indentation in ConsumerConfig
> 
> 
> Improve docs on ConsumerConfig
> 
> 
> 1. Added ClientUtils 2. Added basic constructor implementation for 
> KafkaConsumer
> 
> 
> Improved MockConsumer
> 
> 
> Chris's feedback and also consumer rewind example code
> 
> 
> Added commit() and commitAsync() APIs to the consumer and updated docs and 
> examples to reflect that
> 
> 
> 1. Added consumer usage examples to javadoc 2. Changed signature of APIs that 
> accept or return offsets from list of offsets to map of offsets
> 
> 
> Improved example for using ConsumerRebalanceCallback
> 
> 
> Improved example for using ConsumerRebalanceCallback
> 
> 
> Included Jun's review comments and renamed positions to seek. Also included 
> position()
> 
> 
> Changes to javadoc for positions()
> 
> 
> Changed the javadoc for ConsumerRebalanceCallback
> 
> 
> Changing unsubscribe to also take in var args for topic list
> 
> 
> Incorporated first round of feedback from Jay, Pradeep and Mattijs on the 
> mailing list
> 
> 
> Updated configs
> 
> 
> Javadoc for consumer complete
> 
> 
> Completed docs for Consumer and ConsumerRebalanceCallback. Added MockConsumer
> 
> 
> Added the initial interfaces and related documentation for the consumer. More 
> docs required to complete the public API
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/kafka/common/TopicPartitionOffset.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> a6423f4b37a57f0290e2048b764de1218470f4f7 
>   clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java 
> PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/19731/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to