> On May 5, 2015, 1:39 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  lines 186-188
> > <https://reviews.apache.org/r/33196/diff/4/?file=949010#file949010line186>
> >
> >     Maybe we can combine the commitOffsetRequests and delayedTaskScheduler 
> > (which is only used for commit offsets for now) into one object, sth. 
> > called delayedOffsetCommitRequests that does the following:
> >     
> >     1. maintain a queue of delayed offset commit requests.
> >     
> >     2. whenever commit() is called, a new delayed offset commit request is 
> > en-queued.
> >     
> >     3. each of the delayed offset commit request has a back-off time which 
> > is originally set as 0. When retry-upon-error it is set to the retry 
> > backoff time.
> >     
> >     3. in consumer.poll(), check if the first element in the queue is ready 
> > based on backoff; If yes send it out. Do not try sending it right away when 
> > queue.size == 1.
> >     
> >     4. if commit() is blocking, after putting it into the queue, keep 
> > polling the queue until that request is completed (similar to the current 
> > code).
> >     
> >     5. only dequeue the first element when the response is returned with no 
> > error or all retries exhausted. After de-queuing check immediately if the 
> > next element is ready.
> >     
> >     6. when auto.commit is turned on, I think we can actually use a 
> > separate thread that periodically call async commit(). I think this does 
> > not defect the purpose of a single-threaded consumer such that the 
> > background thread does not interact with the selector at all buy only add 
> > delayed tasks into the queue.

Hmm, I understand your approach and I get that, when limited to this bug alone, 
this might seem like a simplification. But I think it makes the code more 
complicated in the long term.

Timer queues are pretty standard fare for event loops since they give you a 
single point for integrating timer events and tend to keep code simple -- 
honestly it's surprising we didn't end up with one in the new producer yet, 
though the fact that nio doesn't include them seems to be the reason they are a 
bit less common in java-land. Obviously implementations can get way more 
complex than the one I added so far (e.g. it doesn't even support cancelling 
events, let alone better data structures than a priority queue), but having one 
place to schedule future events can really reduce complexity. Consider, for 
example, the timeout computation in NetworkClient.poll() and how it's grown in 
size over time. I'm pretty sure all of those could have just been scheduled 
(cancellable) events which would have made the timeout computation as trivial 
as it is in the consumer.

Timer queues also can have the nice property of working across multiple layers, 
as I've used them here to manage things at both the Consumer and Coordinator 
level. We might be able to also leverage them at the NetworkClient layer to 
simplify some of that code (although care does need to be taken in where you 
may call DelayedTaskQueue.poll()).

One other issue I'm thinking about that this can help with is KAFKA-2168. In 
that issue, we need some way to wake up a poll() call and make sure another 
operation gets executed in a timely fashion. I think the timer queue could make 
this work pretty elegantly, and work whether you are calling from the thread 
you call poll() from or from some other thread: those calls would schedule a 
new timer event in 0 milliseconds (which should only require locking the 
DelayedTaskQueue), then wakeup() the NetworkClient. Any subsequent poll() call 
(either due to the simple looping of a your dedicated poll() thread or making a 
synchronous commit() call which then calls poll() itself) will correctly 
process the requested operation. This does sometimes require separating some of 
the state and tracking it separately (like this queue of commit requests), but 
I think in the long term ends up keeping the code simpler while avoiding lots 
of this logic required to take code that started out as blocking inde
 finitely into a form that can timeout without leaving the Consumer in a bad 
state. See also KAFKA-1894 -- I think a lot of those while loops could instead 
be converted into 1. modify state to indicate we are working on the operation 
(i.e. add it to a queue) 2. add some sort of timeout to eventually kill that 
process 3. allow normal polling to proceed, which should eventually cause the 
task to finish.

Lastly, even if we went with this new approach, I don't think I'd want to use a 
separate thread for the background commit. KAFKA-2168 is still a problem with 
that approach anyway, it seems like overkill just to manage the offset commit 
process, and I think it's probably more code than managing the extra timeout 
computation since we would also need to manage the shutdown process carefully.

Thoughts?


> On May 5, 2015, 1:39 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskScheduler.java,
> >  line 21
> > <https://reviews.apache.org/r/33196/diff/4/?file=949012#file949012line21>
> >
> >     I was a bit confused by this class a bit by thinking it will execute 
> > the tasks that have timed out automatically, by a separate thread. 
> >     
> >     Just wondering if it is better to just replace the "scheduler" with a 
> > delayed task queue, which seems to be more illustrative of its purpose. Or 
> > if we can combine this with the delayed offset requests list: see my other 
> > comment above.

Great point, it started out as a scheduler and ended up a queue. Fixed the 
naming to match.


- Ewen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33196/#review82448
-----------------------------------------------------------


On May 5, 2015, 5:51 a.m., Ewen Cheslack-Postava wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33196/
> -----------------------------------------------------------
> 
> (Updated May 5, 2015, 5:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123: Add queuing of offset commit requests.
> 
> 
> KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for 
> commit retries, and simplify auto commit by using delayed tasks.
> 
> 
> KAFKA-2123: Make synchronous offset commits wait for previous requests to 
> finish in order.
> 
> 
> KAFKA-2123: Remove redundant calls to ensureNotClosed
> 
> 
> KAFKA-2123: Address review comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> ffbdf5dc106e2a59563768280074696c76491337 
> 
> Diff: https://reviews.apache.org/r/33196/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>

Reply via email to