[
https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686919#comment-17686919
]
Jingsong Lee commented on FLINK-30945:
--------------------------------------
[~vicky_papavas] Thanks for reporting! I think this needs to be improved!
> FTS does not support multiple writers into the same table and topic
> -------------------------------------------------------------------
>
> Key: FLINK-30945
> URL: https://issues.apache.org/jira/browse/FLINK-30945
> Project: Flink
> Issue Type: Bug
> Components: Table Store
> Reporter: Vicky Papavasileiou
> Priority: Major
>
> When creating two different streaming jobs that INSERT INTO the same table
> and kafka topic, the second job is never able to make progress as the
> transaction gets constantly aborted due to the producer getting fenced.
> FTS should set the transactionalIdPrefix to avoid transactions of different
> jobs clashing.
> {code:java}
> 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] -
> Writer -> Global Committer -> Sink: end (1/1)#0
> (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0)
> switched from RUNNING to FAILED with failure cause:
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
> failed, logging first encountered failure at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
> at
> org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at
> java.lang.Thread.run(Thread.java:750) Caused by:
> org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
> There is a newer producer with the same transactionalId which fences the
> current one. {code}
> Sample queries:
>
>
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
> 'type'='table-store',
> 'warehouse'='s3://my-bucket/table-store'
> );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> ) WITH (
> 'log.system' = 'kafka',
> 'kafka.bootstrap.servers' = 'broker:9092',
> 'kafka.topic' = 'word_count_log'
> );
> CREATE TEMPORARY TABLE word_table (
> word STRING
> ) WITH (
> 'connector' = 'datagen',
> 'fields.word.length' = '1'
> );
> {code}
>
> And the two INSERT jobs:
> {code:java}
> INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY
> word;{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)