eric yu created FLINK-25126:
-------------------------------
Summary: when SET 'execution.runtime-mode' = 'batch' and
'sink.delivery-guarantee' = 'exactly-once',kafka conncetor will commit fail
Key: FLINK-25126
URL: https://issues.apache.org/jira/browse/FLINK-25126
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.0
Environment: SET 'execution.runtime-mode' = 'batch';
CREATE TABLE ka15 (
name String,
cnt bigint
) WITH (
'connector' = 'kafka',
'topic' = 'shifang8888',
'properties.bootstrap.servers' = 'flinkx1:9092',
'properties.transaction.timeout.ms' = '800000',
'properties.max.block.ms' = '300000',
'value.format' = 'json',
'sink.parallelism' = '2',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'dtstack9999');
insert into ka15 SELECT
name,
cnt
FROM
(VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS
NameTable(name,cnt);
Reporter: eric yu
flinksql task submitted by sql client will failed:
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
at
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
at
org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
at
org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
at java.util.Optional.orElseGet(Optional.java:267)
at
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
... 14 more
i found the reason why kafka commit failed, when downstream operator
CommitterOperator was commiting transaction, the upstream operator
SinkOperator has closed , it will abort the transaction which is committing by
CommitterOperator, this is occurs when execution.runtime-mode is batch
--
This message was sent by Atlassian Jira
(v8.20.1#820001)