----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24411/#review50095 -----------------------------------------------------------
Thank you for rebasing. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/24411/#comment87645> We probably don't need to abort here - the IllegalStateException technically should be like an assert. i.e., the producer should probably be closed at that point anyway. (I think we currently don't do that - i.e., the producer continues to function but I don't think that is correct.) clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/24411/#comment88284> Reader method - can omit? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/24411/#comment88285> Reader method - you mean if ()...? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/24411/#comment87644> Since this is pretty much identical to what we do for metadata requests, we really should make this method a little more general and pass in request-specific functionality (metadata request/tx coordinator metadata). clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/24411/#comment88044> Can we do this rather in the begin phase? i.e., very similar to how we wait on metadata on send? That way we also don't have to account for the timeout adding to poll's time. clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java <https://reviews.apache.org/r/24411/#comment88058> License header. clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java <https://reviews.apache.org/r/24411/#comment88059> In such places, it would help to add a brief comment above that describes this exception. i.e., when this is thrown. It is somewhat clear from the name, but it helps to clarify (as you have done for InvalidTransactionStatusException) clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88170> txRequestTimeoutMs is for the send timeout and txTimeout further down is for the expiration timeout right? Can we make this a bit clearer - right now it is a bit confusing to have two timeouts that are similarly named. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88161> Rather than triggering a tx coordinator lookup here, shall we just do that when we begin/commit/abort a transaction - i.e, if the txCoordinator is unknown at that point. It's not a huge deal, but I'm a bit wary of constructors that start some activity - we actually start the io thread in the constructor - that is not ideal IMHO but okay (since currently the io thread won't be doing anything at this point). But what if the sender tries to modify some state in KafkaProducer that is yet to be constructed/initialized - i.e., in general my preference is that constructors be used only to initialize state as far as possible. i.e., to not start the io thread in the constructor, but only on an initial send (if the thread has not been started already). clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88171> Would infinite expiration time be a better default? An alternative would be to get rid of this altogether and always insist on the client providing an explicit expiration period. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88287> Reader method - you mean if ()...? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88288> Reader method - you mean if ()...? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88294> Repetitive - so how about a maybeAbortTransaction method? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/24411/#comment88293> Need to abort here as well. clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java <https://reviews.apache.org/r/24411/#comment88295> to make it clearer: "timeout for sending..." clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/24411/#comment88248> Should we just reject further sends into the accumulator while a flush is in progress? That way, I think we can avoid the need for a reset flush. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/24411/#comment88298> Assuming you will remove these... clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/24411/#comment88299> Can just do a pendingMessages.clear at the end. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/24411/#comment88053> Temporary/earlier version? - Joel Koshy On Aug. 6, 2014, 7:51 p.m., Raul Castro Fernandez wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24411/ > ----------------------------------------------------------- > > (Updated Aug. 6, 2014, 7:51 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1524 > https://issues.apache.org/jira/browse/KAFKA-1524 > > > Repository: kafka > > > Description > ------- > > KAFKA-1524; transactional producer > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > eea270abb16f40c9f3b47c4ea96be412fb4fdc8b > > clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > f58b8508d3f813a51015abed772c704390887d7e > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > c0f1d57e0feb894d9f246058cd0396461afe3225 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 36e8398416036cab84faad1f07159e5adefd8086 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > f9de4af426449cceca12a8de9a9f54a6241d28d8 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > c5d470011d334318d5ee801021aadd0c000974a6 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > 0323f5f7032dceb49d820c17a41b78c56591ffc4 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca > clients/src/main/java/org/apache/kafka/common/record/Record.java > 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 0762b35abba0551f23047348c5893bb8c9acff14 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > ef2ca65cabe97b909f17b62027a1bb06827e88fe > > clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > 94a11121e207d5cf94dbc94443a8aa7edf387782 > > Diff: https://reviews.apache.org/r/24411/diff/ > > > Testing > ------- > > > Thanks, > > Raul Castro Fernandez > >
