[
https://issues.apache.org/jira/browse/FLINK-25126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17452962#comment-17452962
]
Fabian Paul commented on FLINK-25126:
-------------------------------------
[~tonyboo9527] thanks for the investigation. I had a look and think the problem
is a slightly different one. I suspect that somewhere in your Kafka logs the
committing failed and was retried. In this case, the old KafkaProducer is
reused and a new transactional id is set [1].
Unfortunately, if the committing fails [2] we do not reset the `inTransaction`
variable so during the recycling it seems that another transaction is still
open and it fails. I am preparing a fix to always reset the `inTransaction`
variable.
[1]
https://github.com/apache/flink/blob/bbfe6521c2dfe9c48b810e7266e8dc3e5a501f21/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L162
[2]
[https://github.com/apache/flink/blob/bbfe6521c2dfe9c48b810e7266e8dc3e5a501f21/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L97]
> Kafka connector tries to commit aborted transaction in batch mode
> -----------------------------------------------------------------
>
> Key: FLINK-25126
> URL: https://issues.apache.org/jira/browse/FLINK-25126
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0, 1.15.0
> Environment: SET 'execution.runtime-mode' = 'batch';
> CREATE TABLE ka15 (
> name String,
> cnt bigint
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'shifang8888',
> 'properties.bootstrap.servers' = 'flinkx1:9092',
> 'properties.transaction.timeout.ms' = '800000',
> 'properties.max.block.ms' = '300000',
> 'value.format' = 'json',
> 'sink.parallelism' = '2',
> 'sink.delivery-guarantee' = 'exactly-once',
> 'sink.transactional-id-prefix' = 'dtstack9999');
>
> insert into ka15 SELECT
> name,
> cnt
> FROM
> (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS
> NameTable(name,cnt);
> Reporter: eric yu
> Assignee: Fabian Paul
> Priority: Blocker
> Fix For: 1.15.0, 1.14.1
>
>
> flinksql task submitted by sql client will failed,
> this is the sql :
> SET 'execution.runtime-mode' = 'batch';
> CREATE TABLE ka15 (
> name String,
> cnt bigint
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'shifang8888',
> 'properties.bootstrap.servers' = 'flinkx1:9092',
> 'properties.transaction.timeout.ms' = '800000',
> 'properties.max.block.ms' = '300000',
> 'value.format' = 'json',
> 'sink.parallelism' = '2',
> 'sink.delivery-guarantee' = 'exactly-once',
> 'sink.transactional-id-prefix' = 'dtstack9999');
>
> insert into ka15 SELECT
> name,
> cnt
> FROM
> (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS
> NameTable(name,cnt);
>
> this is the error:
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
> ... 14 more
>
>
> i found the reason why kafka commit failed, when downstream operator
> CommitterOperator was commiting transaction, the upstream operator
> SinkOperator has closed , it will abort the transaction which is committing
> by CommitterOperator, this is occurs when execution.runtime-mode is batch
--
This message was sent by Atlassian Jira
(v8.20.1#820001)