----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30763/#review72794 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java <https://reviews.apache.org/r/30763/#comment118872> I think we have to execute the callback before we wake up the caller thread. Otherwise if something went wrong in this batch, caller thread might not be aware of that before it's waken up and put a bunch of other stuff into producer, or commit offsets. For example, In mirror maker: ... for (rec <- recs) producer.send(rec1); producer.flush(); consumer.commitOffsets(); ... The caller thread could have already committed offsets even if something went wrong in callback. - Jiangjie Qin On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/30763/ > ----------------------------------------------------------- > > (Updated Feb. 7, 2015, 8:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1865 > https://issues.apache.org/jira/browse/KAFKA-1865 > > > Repository: kafka > > > Description > ------- > > KAFKA-1865 Add a flush() method to the producer. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1fd6917c8a5131254c740abad7f7228a47e3628c > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 84530f2b948f9abd74203db48707e490dd9c81a5 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 17fe541588d462c68c33f6209717cc4015e9b62f > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > ecfe2144d778a5d9b614df5278b9f0a15637f10b > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java > 75513b0bdd439329c5771d87436ef83fda853bfb > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > b15237b76def3b234924280fa3fdb25dbb0cc0dc > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 54755e8dd3f23ced313067566cd4ea867f8a496e > > Diff: https://reviews.apache.org/r/30763/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >