[ 
https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553332#comment-14553332
 ] 

Ewen Cheslack-Postava commented on KAFKA-2168:
----------------------------------------------

For option 1 it's probably worth pointing out that we already have some finer 
grained synchronization (metrics and metadata since those are shared by many 
other components, and the producer doesn't have synchronization at the level of 
KafkaProducer, only on its internals). So we're already double locking in a lot 
of cases.

My concern with option 2 is that it's a pretty unusual approach which makes the 
code harder to understand.

Scanning through the code, there aren't that many places in KafkaConsumer where 
multiple components are used together in a way that would require 
synchronization. updateFetchPositions and refreshCommitttedOffsets might since 
they use subscriptions + fetcher and subscriptions + coordinator together, 
respectively. Especially with SubscriptionState we'd need to be careful since 
some of the calls to that return an internal collection & flags, and the 
subsequent operation might need all that processing to be synchronized to be 
sure not to miss anything. For example, during partition reassignment, which 
checks a flag, does reassignment, and then resets the flag; we'd need to make 
sure that a subscription during that time wouldn't get lost. The other case is 
poll(). I thought this might be hard to reason about if some state was changing 
while it was executing, but I think it's not a problem as long as a few of the 
steps can be synchronized, in particular partition reassignment and offset 
commit.

By the way, I mapped out the dependencies. It's sort of in 4 layers with 
subscriptions + metadata at the bottom, NetworkClient above that using only 
metadata, then all three are used by both coordinator and fetcher in the next 
layer, and then the top layer is KafkaConsumer. But KafkaConsumer touches all 
of them, so kind of breaks any layering. Some of the things in KafkaConsumer 
that require synchronization still could possibly move into a component in a 
lower level (possibly something new) if we move some of the code around.

> New consumer poll() can block other calls like position(), commit(), and 
> close() indefinitely
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2168
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2168
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Jason Gustafson
>
> The new consumer is currently using very coarse-grained synchronization. For 
> most methods this isn't a problem since they finish quickly once the lock is 
> acquired, but poll() might run for a long time (and commonly will since 
> polling with long timeouts is a normal use case). This means any operations 
> invoked from another thread may block until the poll() call completes.
> Some example use cases where this can be a problem:
> * A shutdown hook is registered to trigger shutdown and invokes close(). It 
> gets invoked from another thread and blocks indefinitely.
> * User wants to manage offset commit themselves in a background thread. If 
> the commit policy is not purely time based, it's not currently possibly to 
> make sure the call to commit() will be processed promptly.
> Two possible solutions to this:
> 1. Make sure a lock is not held during the actual select call. Since we have 
> multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) 
> this is probably hard to make work cleanly since locking is currently only 
> performed at the KafkaConsumer level and we'd want it unlocked around a 
> single line of code in Selector.
> 2. Wake up the selector before synchronizing for certain operations. This 
> would require some additional coordination to make sure the caller of 
> wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() 
> thread being woken up and then promptly reacquiring the lock with a 
> subsequent long poll() call).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to