> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 122 > > <https://reviews.apache.org/r/24411/diff/1/?file=654039#file654039line122> > > > > txRequestTimeoutMs is for the send timeout and txTimeout further down > > is for the expiration timeout right? Can we make this a bit clearer - right > > now it is a bit confusing to have two timeouts that are similarly named.
I left txRequestTimeoutMs to keep the symmetry with ackTimeoutMs. I have changed txTimeout to txExpirationTime. > On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 527 > > <https://reviews.apache.org/r/24411/diff/1/?file=654036#file654036line527> > > > > Can we do this rather in the begin phase? i.e., very similar to how we > > wait on metadata on send? That way we also don't have to account for the > > timeout adding to poll's time. Yes, I considered that possibility. However, due to the asynchronicity, there might be some gap between the point where begin/abort/commit send a txControl message and the moment where this is actually going to be sent, so I thought on delaying the checking as late as possible. Do you think this gap should be small enough to not matter? > On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 490 > > <https://reviews.apache.org/r/24411/diff/1/?file=654036#file654036line490> > > > > Since this is pretty much identical to what we do for metadata > > requests, we really should make this method a little more general and pass > > in request-specific functionality (metadata request/tx coordinator > > metadata). The main issue here is that the tiny differences between these two happen inside the condition (connectionStates.isConnected()...), so we would be pushing a fair amount of code up to the poll method. Not sure what is better. > On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 334 > > <https://reviews.apache.org/r/24411/diff/1/?file=654043#file654043line334> > > > > Should we just reject further sends into the accumulator while a flush > > is in progress? That way, I think we can avoid the need for a reset flush. Not sure if I understand this well, maybe we can discuss further in person? > On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 141 > > <https://reviews.apache.org/r/24411/diff/1/?file=654039#file654039line141> > > > > Rather than triggering a tx coordinator lookup here, shall we just do > > that when we begin/commit/abort a transaction - i.e, if the txCoordinator > > is unknown at that point. It's not a huge deal, but I'm a bit wary of > > constructors that start some activity - we actually start the io thread in > > the constructor - that is not ideal IMHO but okay (since currently the io > > thread won't be doing anything at this point). But what if the sender tries > > to modify some state in KafkaProducer that is yet to be > > constructed/initialized - i.e., in general my preference is that > > constructors be used only to initialize state as far as possible. i.e., to > > not start the io thread in the constructor, but only on an initial send (if > > the thread has not been started already). I agree that we should take that piece of functionality out of the constructor, however I am not sure how to address it, as putting it in the begin/abort/commit would have similar problems as the asynchronicity commented above. I think we should discuss this further. - Raul ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24411/#review50095 ----------------------------------------------------------- On Aug. 6, 2014, 7:51 p.m., Raul Castro Fernandez wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24411/ > ----------------------------------------------------------- > > (Updated Aug. 6, 2014, 7:51 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1524 > https://issues.apache.org/jira/browse/KAFKA-1524 > > > Repository: kafka > > > Description > ------- > > KAFKA-1524; transactional producer > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > eea270abb16f40c9f3b47c4ea96be412fb4fdc8b > > clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > f58b8508d3f813a51015abed772c704390887d7e > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > c0f1d57e0feb894d9f246058cd0396461afe3225 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 36e8398416036cab84faad1f07159e5adefd8086 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > f9de4af426449cceca12a8de9a9f54a6241d28d8 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > c5d470011d334318d5ee801021aadd0c000974a6 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > 0323f5f7032dceb49d820c17a41b78c56591ffc4 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca > clients/src/main/java/org/apache/kafka/common/record/Record.java > 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 0762b35abba0551f23047348c5893bb8c9acff14 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > ef2ca65cabe97b909f17b62027a1bb06827e88fe > > clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > 94a11121e207d5cf94dbc94443a8aa7edf387782 > > Diff: https://reviews.apache.org/r/24411/diff/ > > > Testing > ------- > > > Thanks, > > Raul Castro Fernandez > >
