[
https://issues.apache.org/jira/browse/FLINK-25126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Paul updated FLINK-25126:
--------------------------------
Summary: Kafka connector tries to commit aborted transaction in batch mode
(was: Kafka connector tries to commit with closed producer in batch mode)
> Kafka connector tries to commit aborted transaction in batch mode
> -----------------------------------------------------------------
>
> Key: FLINK-25126
> URL: https://issues.apache.org/jira/browse/FLINK-25126
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0, 1.15.0
> Environment: SET 'execution.runtime-mode' = 'batch';
> CREATE TABLE ka15 (
> name String,
> cnt bigint
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'shifang8888',
> 'properties.bootstrap.servers' = 'flinkx1:9092',
> 'properties.transaction.timeout.ms' = '800000',
> 'properties.max.block.ms' = '300000',
> 'value.format' = 'json',
> 'sink.parallelism' = '2',
> 'sink.delivery-guarantee' = 'exactly-once',
> 'sink.transactional-id-prefix' = 'dtstack9999');
>
> insert into ka15 SELECT
> name,
> cnt
> FROM
> (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS
> NameTable(name,cnt);
> Reporter: eric yu
> Priority: Blocker
> Fix For: 1.15.0, 1.14.1
>
>
> flinksql task submitted by sql client will failed,
> this is the sql :
> SET 'execution.runtime-mode' = 'batch';
> CREATE TABLE ka15 (
> name String,
> cnt bigint
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'shifang8888',
> 'properties.bootstrap.servers' = 'flinkx1:9092',
> 'properties.transaction.timeout.ms' = '800000',
> 'properties.max.block.ms' = '300000',
> 'value.format' = 'json',
> 'sink.parallelism' = '2',
> 'sink.delivery-guarantee' = 'exactly-once',
> 'sink.transactional-id-prefix' = 'dtstack9999');
>
> insert into ka15 SELECT
> name,
> cnt
> FROM
> (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS
> NameTable(name,cnt);
>
> this is the error:
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
> ... 14 more
>
>
> i found the reason why kafka commit failed, when downstream operator
> CommitterOperator was commiting transaction, the upstream operator
> SinkOperator has closed , it will abort the transaction which is committing
> by CommitterOperator, this is occurs when execution.runtime-mode is batch
--
This message was sent by Atlassian Jira
(v8.20.1#820001)