> On April 20, 2015, 5:30 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 526 > > <https://reviews.apache.org/r/31850/diff/9/?file=931818#file931818line526> > > > > I cleaned up this javadoc a little bit to try to simplify things. With > > docs it is always hard to get the level of focus right so that it has the > > essential information but doesn't overload the user and obscure the primary > > thing. > > > > 1. Changed "messages" to "records" (that is the terminology in the new > > clients). > > 2. I significantly shortened the section on calling from within a > > callback. I think only 0.00001% of people would ever consider this. > > 3. I cannot think of a reason why the user would care if the I/O thread > > is synchronously shutdown or not, and we don't make any promises one way or > > the other in the main close method, so let's just leave that bit out. > > > > What do you think of this: > > /** > > * This method waits up to <code>timeout</code> for the producer to > > complete the sending of all incomplete requests. > > * <p> > > * If the producer is unable to complete all requests before the > > timeout expires, this method will fail > > * any unsent and unacknowledged records immediately. > > * <p> > > * If invoked from within a {@link Callback} this method will not > > block and will be equivalent to <code>close(0, > > TimeUnit.MILLISECONDS)</code>. This is done since no further sending will > > happen while blocking the I/O thread of the producer. > > * > > * @param timeout The maximum time to wait for producer to complete > > any pending requests. The value should be > > * non-negative. Specifying a timeout of zero means > > do not wait for pending send requests to complete. > > * @param timeUnit The time unit for the <code>timeout</code> > > * @throws InterruptException If the thread is interrupted while > > blocked > > * @throws IllegalArgumentException If the <code>timeout</code> is > > negative. > > */
Looks good and it is easier to understand from user point of view. Thanks for cleaning this up. > On April 20, 2015, 5:30 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 157 > > <https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157> > > > > Read locks are very expensive. I am pretty worried about this. If we > > want to do this we need to do a pretty detailed examination of the perf > > impact. Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it seems under the hood it uses CompareAndSet which should provide similar performance as atomic integer. But I agree this definitely largely depends on implementation. I modified o.a.k.clients.tools.ProducerPerformance a little bit to make it multiple threaded. The performance in following tests settings are very similar which are all ~1M messages/second when target is 10M message/sec. 1. 10 thread with latest trunk 2. 10 threads using atomic integer AtomicInteger 3. 10 threads using ReaderWriterLock When I increase the thread number to 50. It drops to about 0.82M messages/second in all cases. It seems reader lock did not introduce performance issue. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review80753 ----------------------------------------------------------- On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated April 16, 2015, 6:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Addressed Joel and Guozhang's comments. > > > rebased on trunk > > > Rebase on trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments > > > Addressed Jay's comments > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > b91e2c52ed0acb1faa85915097d97bafa28c413a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 9811a2b2b1e9bf1beb301138f7626e12d275a8db > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >