> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 122
> > <https://reviews.apache.org/r/24411/diff/1/?file=654039#file654039line122>
> >
> >     txRequestTimeoutMs is for the send timeout and txTimeout further down 
> > is for the expiration timeout right? Can we make this a bit clearer - right 
> > now it is a bit confusing to have two timeouts that are similarly named.

I left txRequestTimeoutMs to keep the symmetry with ackTimeoutMs. I have 
changed txTimeout to txExpirationTime.


> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 527
> > <https://reviews.apache.org/r/24411/diff/1/?file=654036#file654036line527>
> >
> >     Can we do this rather in the begin phase? i.e., very similar to how we 
> > wait on metadata on send? That way we also don't have to account for the 
> > timeout adding to poll's time.

Yes, I considered that possibility. However, due to the asynchronicity, there 
might be some gap between the point where begin/abort/commit send a txControl 
message and the moment where this is actually going to be sent, so I thought on 
delaying the checking as late as possible. Do you think this gap should be 
small enough to not matter?


> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 490
> > <https://reviews.apache.org/r/24411/diff/1/?file=654036#file654036line490>
> >
> >     Since this is pretty much identical to what we do for metadata 
> > requests, we really should make this method a little more general and pass 
> > in request-specific functionality (metadata request/tx coordinator 
> > metadata).

The main issue here is that the tiny differences between these two happen 
inside the condition (connectionStates.isConnected()...), so we would be 
pushing a fair amount of code up to the poll method. Not sure what is better.


> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 334
> > <https://reviews.apache.org/r/24411/diff/1/?file=654043#file654043line334>
> >
> >     Should we just reject further sends into the accumulator while a flush 
> > is in progress? That way, I think we can avoid the need for a reset flush.

Not sure if I understand this well, maybe we can discuss further in person?


> On Aug. 13, 2014, 6:47 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 141
> > <https://reviews.apache.org/r/24411/diff/1/?file=654039#file654039line141>
> >
> >     Rather than triggering a tx coordinator lookup here, shall we just do 
> > that when we begin/commit/abort a transaction - i.e, if the txCoordinator 
> > is unknown at that point. It's not a huge deal, but I'm a bit wary of 
> > constructors that start some activity - we actually start the io thread in 
> > the constructor - that is not ideal IMHO but okay (since currently the io 
> > thread won't be doing anything at this point). But what if the sender tries 
> > to modify some state in KafkaProducer that is yet to be 
> > constructed/initialized - i.e., in general my preference is that 
> > constructors be used only to initialize state as far as possible. i.e., to 
> > not start the io thread in the constructor, but only on an initial send (if 
> > the thread has not been started already).

I agree that we should take that piece of functionality out of the constructor, 
however I am not sure how to address it, as putting it in the 
begin/abort/commit would have similar problems as the asynchronicity commented 
above. I think we should discuss this further.


- Raul


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24411/#review50095
-----------------------------------------------------------


On Aug. 6, 2014, 7:51 p.m., Raul Castro Fernandez wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24411/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 7:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1524
>     https://issues.apache.org/jira/browse/KAFKA-1524
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1524; transactional producer
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> f58b8508d3f813a51015abed772c704390887d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> c0f1d57e0feb894d9f246058cd0396461afe3225 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 36e8398416036cab84faad1f07159e5adefd8086 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> f9de4af426449cceca12a8de9a9f54a6241d28d8 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  c5d470011d334318d5ee801021aadd0c000974a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  0762b35abba0551f23047348c5893bb8c9acff14 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> ef2ca65cabe97b909f17b62027a1bb06827e88fe 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java
>  PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> 94a11121e207d5cf94dbc94443a8aa7edf387782 
> 
> Diff: https://reviews.apache.org/r/24411/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Raul Castro Fernandez
> 
>

Reply via email to