[ 
https://issues.apache.org/jira/browse/FLINK-25126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul updated FLINK-25126:
--------------------------------
    Summary: Kafka connector tries to commit aborted transaction in batch mode  
(was: Kafka connector tries to commit with closed producer in batch mode)

> Kafka connector tries to commit aborted transaction in batch mode
> -----------------------------------------------------------------
>
>                 Key: FLINK-25126
>                 URL: https://issues.apache.org/jira/browse/FLINK-25126
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0, 1.15.0
>         Environment: SET 'execution.runtime-mode' = 'batch';
>  CREATE TABLE ka15 (
>      name String,
>      cnt bigint
>  ) WITH (
>    'connector' = 'kafka',
>    'topic' = 'shifang8888',
>   'properties.bootstrap.servers' = 'flinkx1:9092',
>   'properties.transaction.timeout.ms' = '800000',
>   'properties.max.block.ms' = '300000',
>    'value.format' = 'json',
>    'sink.parallelism' = '2',
>    'sink.delivery-guarantee' = 'exactly-once',
>    'sink.transactional-id-prefix' = 'dtstack9999');
>  
> insert into ka15 SELECT
>   name,
>   cnt
> FROM
>   (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS 
> NameTable(name,cnt);
>            Reporter: eric yu
>            Priority: Blocker
>             Fix For: 1.15.0, 1.14.1
>
>
> flinksql task submitted by sql client will failed,
> this is the sql :
> SET 'execution.runtime-mode' = 'batch';
>  CREATE TABLE ka15 (
>      name String,
>      cnt bigint
>  ) WITH (
>    'connector' = 'kafka',
>    'topic' = 'shifang8888',
>   'properties.bootstrap.servers' = 'flinkx1:9092',
>   'properties.transaction.timeout.ms' = '800000',
>   'properties.max.block.ms' = '300000',
>    'value.format' = 'json',
>    'sink.parallelism' = '2',
>    'sink.delivery-guarantee' = 'exactly-once',
>    'sink.transactional-id-prefix' = 'dtstack9999');
>  
> insert into ka15 SELECT
>   name,
>   cnt
> FROM
>   (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS 
> NameTable(name,cnt);
>  
> this is the error:
> Caused by: java.lang.IllegalStateException
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> at 
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
> at java.util.Optional.orElseGet(Optional.java:267)
> at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
> ... 14 more
>  
>  
> i found the reason why  kafka commit failed, when downstream operator 
> CommitterOperator was commiting transaction, the upstream  operator 
> SinkOperator has closed , it will abort the transaction which  is committing 
> by CommitterOperator, this is occurs when execution.runtime-mode is batch



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to