> On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122
> > <https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122>
> >
> >     The comments "When connecting or connected, this handles slow/stalled 
> > connections" here are a bit misleading: after checking the code I realize 
> > connectionDelay is only triggered to detemine the delay in milis that we 
> > can re-check connectivity for node that is not connected, and hence if the 
> > node is connected again while we are determining its delay, we just set it 
> > to MAX.
> >     
> >     Instead of making it general to the KafkaClient interface, shall we 
> > just add this to the code block of line 155?

It gets triggered any time NetworkClient.ready returns false for a node. The 
obvious case is that it will return "not ready" when disconnected, but it also 
does so when connecting or when connected but inFlightRequests.canSendMore() 
returns false (thus the mention of "slow/stalled connections". The important 
thing is that the value returned *is* MAX_VALUE in those latter cases because 
neither one will be resolved by polling -- they both require an external event 
(connection established/failed or outstanding request receives a response) 
which should wake up the event loop when there's something to do. That keeps us 
from polling unnecessarily. Previously there were conditions in which 
connections in these states could trigger busy waiting of the poll loop.

I don't think we can get the same effect just inlining the code because it uses 
state that's only available through ClusterConnectionStates, which is private 
to NetworkClient. The KafkaClient only exposes the higher level concept of 
"ready".


- Ewen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
-----------------------------------------------------------


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26885/
> -----------------------------------------------------------
> 
> (Updated Oct. 23, 2014, 11:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1642
>     https://issues.apache.org/jira/browse/KAFKA-1642
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixes two issues with the computation of ready nodes and poll timeouts in
> Sender/RecordAccumulator:
> 
> 1. The timeout was computed incorrectly because it took into account all 
> nodes,
> even if they had data to send such that their timeout would be 0. However, 
> nodes
> were then filtered based on whether it was possible to send (i.e. their
> connection was still good) which could result in nothing to send and a 0
> timeout, resulting in busy looping. Instead, the timeout needs to be computed
> only using data that cannot be immediately sent, i.e. where the timeout will 
> be
> greater than 0. This timeout is only used if, after filtering by whether
> connections are ready for sending, there is no data to be sent. Other events 
> can
> wake the thread up earlier, e.g. a client reconnects and becomes ready again.
> 
> 2. One of the conditions indicating whether data is sendable is whether a
> timeout has expired -- either the linger time or the retry backoff. This
> condition wasn't accounting for both cases properly, always using the linger
> time. This means the retry backoff was probably not being respected.
> 
> KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
> none can send data because they are in a connection backoff period.
> 
> 
> Addressing Jun's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> d304660f29246e9600efe3ddb28cfcc2b074bed3 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 29658d4a15f112dc0af5ce517eaab93e6f00134b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  c5d470011d334318d5ee801021aadd0c000974a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> aae8d4a1e98279470587d397cc779a9baf6fee6c 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  0762b35abba0551f23047348c5893bb8c9acff14 
> 
> Diff: https://reviews.apache.org/r/26885/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>

Reply via email to