> On Sept. 11, 2015, 1:44 a.m., Jason Gustafson wrote:
> > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java, line 
> > 140
> > <https://reviews.apache.org/r/36858/diff/9/?file=1066895#file1066895line140>
> >
> >     This line is puzzling me a little bit. Wouldn't we want to use the 
> > oldest request for this check. If we use the last sent request, then we 
> > might miss older requests that had already timed out, which would tend to 
> > delay timeout detection. Maybe I'm missing something?
> 
> Mayuresh Gharat wrote:
>     The inflight request are sent one after another. The reason we use 
> lastSent() is because if we know that if the last Sent is timed out, it means 
> that all older requests should have been timed out.
> 
> Jason Gustafson wrote:
>     Ok, so the guarantee this provides is that each request will wait at 
> least the timeout to be fulfilled, but some could wait longer. This treats 
> the timeout setting as a minimum. The downside is that it will take longer to 
> detect a node failure since we always have to wait for the latest request to 
> timeout (which will delay the timeout for older requests). Is that right? 
> Alternatively, if you used the oldest request, the timeout would be treated 
> as a maximum: no request would wait longer than the timeout, but some could 
> be cancelled sooner. In general, this would lead to earlier failure 
> detection. I'm not sure I see the benefit of using the minimum approach, but 
> it might be worthwhile documenting the tradeoff if you haven't already done 
> so somewhere.
> 
> Mayuresh Gharat wrote:
>     Hi Jason,
>     
>     Now I understand what you are saying. Thats an excellent catch. Although 
> the time diference between the subsequent request will be minimum and my 
> current patch will detect the timeout and do metadata update, I completely 
> agree with you : We should be peeking the tail element from the queue rather 
> than the head element. Will upload a new patch for it. Thanks a lot for this.
>     
>     
>     Actually thinking further we should be doing the same thing when we are 
> iterating over completedSends() in NetworkClient.handleCompletedSends(). 
> Although it works the current way, its kind of confusing if we don't keep the 
> iteration consistent. What do you think on this?

Seems like handleCompletedSends might be correct since the next Send will be at 
the front and the code doesn't allow more than one pending Send. It's a little 
confusing because completing the Send just means successfully transmitting it 
to the network layer. Does that seem right?


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review98526
-----------------------------------------------------------


On Sept. 11, 2015, 9:54 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Sept. 11, 2015, 9:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> checkpoint
> 
> 
> Addressed Joel's concerns. Also tried to include Jun's feedback.
> 
> 
> Fixed a minor comment
> 
> 
> Solved unittest issue
> 
> 
> Addressed Jun's comments regarding NetworkClient
> 
> 
> Addressed Jun's comments about disconnect() in Selector
> 
> 
> changed logging level to debug
> 
> 
> Addressed Joels comments to break out early from the loop while aborting 
> expired batches
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's concern about iterating over timeout request in 
> getNodesWithTimedOutRequest()
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> f46c0d9b5eb73887c62a0e09c96e9d8c964c709d 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 1302f359791b657b6f7c1ca1bd419ded9b01c67d 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> b9a2d4e2bc565f0ee72b27791afe5c894af262f1 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 5763bac6cfe667bfbabc5f160f35fb85f9b158e3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 804d569498396d431880641041fc9292076452cb 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 06f00a99a73a288df9afa8c1d4abe3580fa968a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> d2e64f7cd8bf56e433a210905b2874f71eee9ea0 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> 70e74bd6aa629c430b2850ca40c97df0b16e5d75 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 4aa5cbb86ce6e1bf8f6769147ee2a6452c855c74 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> e5815f56bdf8e2d980f2bc36b831ed234c0ac781 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 69c93c3adf674b1640534c3d7410fcaafaf2232c 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
>  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  5b2e4ffaeab7127648db608c179703b27b577414 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  aa44991777a855f4b7f4f7bf17107c69393ff8ff 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> df1205c935bee9a30a50816dbade64d6014b1ef2 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> f83fd9b794a3bd191121a22bcb40fd6ec31d83b5 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> da1cff07f7f76dcfa5a805718febcccd4ed5f578 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 1e8b2331486ffe55bfcc0919e48e12aad23b7d3c 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 30406ce809caaac56aca1f30c235b35962d55a50 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 
> 46a68e97b8bcc8821f21e4220ce9b3acedc5dafe 
>   core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala 
> ad10721de844725f27a116611209992cea61b088 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 1198df02ddd7727269e84a751ba99520f6d5584a 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 5b4f2db4607ae6d17696c1140f1a771ce75c80e0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 09b8444c2add87f0f70dbb182e892977a6b5c243 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>

Reply via email to