-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24411/#review50095
-----------------------------------------------------------


Thank you for rebasing.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/24411/#comment87645>

    We probably don't need to abort here - the IllegalStateException 
technically should be like an assert. i.e., the producer should probably be 
closed at that point anyway. (I think we currently don't do that - i.e., the 
producer continues to function but I don't think that is correct.)



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/24411/#comment88284>

    Reader method - can omit?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/24411/#comment88285>

    Reader method - you mean if ()...?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/24411/#comment87644>

    Since this is pretty much identical to what we do for metadata requests, we 
really should make this method a little more general and pass in 
request-specific functionality (metadata request/tx coordinator metadata).



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/24411/#comment88044>

    Can we do this rather in the begin phase? i.e., very similar to how we wait 
on metadata on send? That way we also don't have to account for the timeout 
adding to poll's time.



clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java
<https://reviews.apache.org/r/24411/#comment88058>

    License header.



clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java
<https://reviews.apache.org/r/24411/#comment88059>

    In such places, it would help to add a brief comment above that describes 
this exception. i.e., when this is thrown. It is somewhat clear from the name, 
but it helps to clarify (as you have done for InvalidTransactionStatusException)



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88170>

    txRequestTimeoutMs is for the send timeout and txTimeout further down is 
for the expiration timeout right? Can we make this a bit clearer - right now it 
is a bit confusing to have two timeouts that are similarly named.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88161>

    Rather than triggering a tx coordinator lookup here, shall we just do that 
when we begin/commit/abort a transaction - i.e, if the txCoordinator is unknown 
at that point. It's not a huge deal, but I'm a bit wary of constructors that 
start some activity - we actually start the io thread in the constructor - that 
is not ideal IMHO but okay (since currently the io thread won't be doing 
anything at this point). But what if the sender tries to modify some state in 
KafkaProducer that is yet to be constructed/initialized - i.e., in general my 
preference is that constructors be used only to initialize state as far as 
possible. i.e., to not start the io thread in the constructor, but only on an 
initial send (if the thread has not been started already).



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88171>

    Would infinite expiration time be a better default? An alternative would be 
to get rid of this altogether and always insist on the client providing an 
explicit expiration period.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88287>

    Reader method - you mean if ()...?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88288>

    Reader method - you mean if ()...?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88294>

    Repetitive - so how about a maybeAbortTransaction method?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/24411/#comment88293>

    Need to abort here as well.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/24411/#comment88295>

    to make it clearer: "timeout for sending..."



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/24411/#comment88248>

    Should we just reject further sends into the accumulator while a flush is 
in progress? That way, I think we can avoid the need for a reset flush.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/24411/#comment88298>

    Assuming you will remove these...



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/24411/#comment88299>

    Can just do a pendingMessages.clear at the end.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/24411/#comment88053>

    Temporary/earlier version?


- Joel Koshy


On Aug. 6, 2014, 7:51 p.m., Raul Castro Fernandez wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24411/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2014, 7:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1524
>     https://issues.apache.org/jira/browse/KAFKA-1524
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1524; transactional producer
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/AbortTransactionException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> f58b8508d3f813a51015abed772c704390887d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> c0f1d57e0feb894d9f246058cd0396461afe3225 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 36e8398416036cab84faad1f07159e5adefd8086 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> f9de4af426449cceca12a8de9a9f54a6241d28d8 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  c5d470011d334318d5ee801021aadd0c000974a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  0762b35abba0551f23047348c5893bb8c9acff14 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> ef2ca65cabe97b909f17b62027a1bb06827e88fe 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java
>  PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> 94a11121e207d5cf94dbc94443a8aa7edf387782 
> 
> Diff: https://reviews.apache.org/r/24411/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Raul Castro Fernandez
> 
>

Reply via email to