-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33196/#review82448
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/33196/#comment133185>

    Shall we avoid wildcards?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/33196/#comment133187>

    Avoid using wildcards.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/33196/#comment133208>

    Maybe we can combine the commitOffsetRequests and delayedTaskScheduler 
(which is only used for commit offsets for now) into one object, sth. called 
delayedOffsetCommitRequests that does the following:
    
    1. maintain a queue of delayed offset commit requests.
    
    2. whenever commit() is called, a new delayed offset commit request is 
en-queued.
    
    3. each of the delayed offset commit request has a back-off time which is 
originally set as 0. When retry-upon-error it is set to the retry backoff time.
    
    3. in consumer.poll(), check if the first element in the queue is ready 
based on backoff; If yes send it out. Do not try sending it right away when 
queue.size == 1.
    
    4. if commit() is blocking, after putting it into the queue, keep polling 
the queue until that request is completed (similar to the current code).
    
    5. only dequeue the first element when the response is returned with no 
error or all retries exhausted. After de-queuing check immediately if the next 
element is ready.
    
    6. when auto.commit is turned on, I think we can actually use a separate 
thread that periodically call async commit(). I think this does not defect the 
purpose of a single-threaded consumer such that the background thread does not 
interact with the selector at all buy only add delayed tasks into the queue.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java
<https://reviews.apache.org/r/33196/#comment133196>

    I was a bit confused by this class a bit by thinking it will execute the 
tasks that have timed out automatically, by a separate thread. 
    
    Just wondering if it is better to just replace the "scheduler" with a 
delayed task queue, which seems to be more illustrative of its purpose. Or if 
we can combine this with the delayed offset requests list: see my other comment 
above.


- Guozhang Wang


On May 4, 2015, 4:39 p.m., Ewen Cheslack-Postava wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33196/
> -----------------------------------------------------------
> 
> (Updated May 4, 2015, 4:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123: Add queuing of offset commit requests.
> 
> 
> KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for 
> commit retries, and simplify auto commit by using delayed tasks.
> 
> 
> KAFKA-2123: Make synchronous offset commits wait for previous requests to 
> finish in order.
> 
> 
> KAFKA-2123: Remove redundant calls to ensureNotClosed
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskSchedulerTest.java
>  PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> ffbdf5dc106e2a59563768280074696c76491337 
> 
> Diff: https://reviews.apache.org/r/33196/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>

Reply via email to