> On Sept. 11, 2015, 1:44 a.m., Jason Gustafson wrote:
> > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java, line 
> > 140
> > <https://reviews.apache.org/r/36858/diff/9/?file=1066895#file1066895line140>
> >
> >     This line is puzzling me a little bit. Wouldn't we want to use the 
> > oldest request for this check. If we use the last sent request, then we 
> > might miss older requests that had already timed out, which would tend to 
> > delay timeout detection. Maybe I'm missing something?
> 
> Mayuresh Gharat wrote:
>     The inflight request are sent one after another. The reason we use 
> lastSent() is because if we know that if the last Sent is timed out, it means 
> that all older requests should have been timed out.
> 
> Jason Gustafson wrote:
>     Ok, so the guarantee this provides is that each request will wait at 
> least the timeout to be fulfilled, but some could wait longer. This treats 
> the timeout setting as a minimum. The downside is that it will take longer to 
> detect a node failure since we always have to wait for the latest request to 
> timeout (which will delay the timeout for older requests). Is that right? 
> Alternatively, if you used the oldest request, the timeout would be treated 
> as a maximum: no request would wait longer than the timeout, but some could 
> be cancelled sooner. In general, this would lead to earlier failure 
> detection. I'm not sure I see the benefit of using the minimum approach, but 
> it might be worthwhile documenting the tradeoff if you haven't already done 
> so somewhere.

Hi Jason,

Now I understand what you are saying. Thats an excellent catch. Although the 
time diference between the subsequent request will be minimum and my current 
patch will detect the timeout and do metadata update, I completely agree with 
you : We should be peeking the tail element from the queue rather than the head 
element. Will upload a new patch for it. Thanks a lot for this.


Actually thinking further we should be doing the same thing when we are 
iterating over completedSends() in NetworkClient.handleCompletedSends(). 
Although it works the current way, its kind of confusing if we don't keep the 
iteration consistent. What do you think on this?


- Mayuresh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review98526
-----------------------------------------------------------


On Sept. 11, 2015, 9:54 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Sept. 11, 2015, 9:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> checkpoint
> 
> 
> Addressed Joel's concerns. Also tried to include Jun's feedback.
> 
> 
> Fixed a minor comment
> 
> 
> Solved unittest issue
> 
> 
> Addressed Jun's comments regarding NetworkClient
> 
> 
> Addressed Jun's comments about disconnect() in Selector
> 
> 
> changed logging level to debug
> 
> 
> Addressed Joels comments to break out early from the loop while aborting 
> expired batches
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's concern about iterating over timeout request in 
> getNodesWithTimedOutRequest()
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> f46c0d9b5eb73887c62a0e09c96e9d8c964c709d 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 1302f359791b657b6f7c1ca1bd419ded9b01c67d 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> b9a2d4e2bc565f0ee72b27791afe5c894af262f1 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 5763bac6cfe667bfbabc5f160f35fb85f9b158e3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 804d569498396d431880641041fc9292076452cb 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 06f00a99a73a288df9afa8c1d4abe3580fa968a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> d2e64f7cd8bf56e433a210905b2874f71eee9ea0 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> 70e74bd6aa629c430b2850ca40c97df0b16e5d75 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 4aa5cbb86ce6e1bf8f6769147ee2a6452c855c74 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> e5815f56bdf8e2d980f2bc36b831ed234c0ac781 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 69c93c3adf674b1640534c3d7410fcaafaf2232c 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
>  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  5b2e4ffaeab7127648db608c179703b27b577414 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  aa44991777a855f4b7f4f7bf17107c69393ff8ff 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> df1205c935bee9a30a50816dbade64d6014b1ef2 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> f83fd9b794a3bd191121a22bcb40fd6ec31d83b5 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> da1cff07f7f76dcfa5a805718febcccd4ed5f578 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 1e8b2331486ffe55bfcc0919e48e12aad23b7d3c 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 30406ce809caaac56aca1f30c235b35962d55a50 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 
> 46a68e97b8bcc8821f21e4220ce9b3acedc5dafe 
>   core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala 
> ad10721de844725f27a116611209992cea61b088 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 1198df02ddd7727269e84a751ba99520f6d5584a 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 5b4f2db4607ae6d17696c1140f1a771ce75c80e0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 09b8444c2add87f0f70dbb182e892977a6b5c243 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>

Reply via email to