[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947123#comment-16947123
 ] 

Jiangjie Qin commented on FLINK-14302:
--------------------------------------

[~tonywei] Thanks for the explanation. I am still a little confused. From the 
source code it looks the log I mentioned should be printed in [1], but it looks 
that this logging should never see NOT_COORDINATOR error code.

In any case, I think it is a good idea to ensure on the Flink side we are not 
committing empty transactions. However, technically speaking, the Kafka brokers 
should not assume the behavior of the clients because there could be 3rd party 
Kafka clients implementations. So I think there should be a fix on the Kafka 
side as well. But that might be a protocol change to add an error field in 
{{AddPartitionsToTxnResponse}}. We can leave it as is for now.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L268]

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14302
>                 URL: https://issues.apache.org/jira/browse/FLINK-14302
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Wei-Che Wei
>            Assignee: Wei-Che Wei
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to