[
https://issues.apache.org/jira/browse/FLINK-39723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085261#comment-18085261
]
Martijn Visser commented on FLINK-39723:
----------------------------------------
I don't think this is a transactional-id reuse issue, but it's a topic metadata
propagation race. Looking at the the weekly run
https://github.com/apache/flink-connector-kafka/actions/runs/26755629723,
reproduced 3 times across surefire
retries for the same topic. Inline MiniCluster stack trace:
{code:java}
java.lang.RuntimeException: Failed to get open transactions for topics
[topics_csv_<uuid>].
at AdminUtils.getOpenTransactionsForTopics(AdminUtils.java:115)
at TransactionAbortStrategyImpl$2.abortTransactions(...) # LISTING
strategy
at
ExactlyOnceKafkaWriter.abortLingeringTransactions(ExactlyOnceKafkaWriter.java:331)
at ExactlyOnceKafkaWriter.initialize(ExactlyOnceKafkaWriter.java:194) #
sink writer init
...
Caused by: ExecutionException: UnknownTopicOrPartitionException:
This server does not host this topic-partition.
{code}
The root case looks to be that:
1. The test creates its topic via KafkaTableTestBase.createTestTopic(), which
calls AdminClient.createTopics(...).all().get() and returns as soon as the
controller acknowledges creation. It does *not* wait for partition
leadership/metadata to propagate.
2. The test then immediately starts an EXACTLY_ONCE sink job. During writer
initialization, ExactlyOnceKafkaWriter.abortLingeringTransactions() (LISTING
strategy) calls AdminUtils.getTopicMetadata(), which races ahead of metadata
propagation and receives UnknownTopicOrPartitionException.
3. The job runs with NoRestartBackoffTimeStrategy, so the transient error kills
the job immediately (observed ~0.2s), surfacing to the test as "TableException:
Failed to wait job finish".
This is the same readiness-race class addressed by FLINK-39234
(createNewTopicAndWaitForPartitionAssignment).
> KafkaTableITCase.testExactlyOnceSink fails due to transactional ID reuse
> across test runs
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-39723
> URL: https://issues.apache.org/jira/browse/FLINK-39723
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Aleksandr Savonin
> Assignee: Aleksandr Savonin
> Priority: Major
>
> KafkaTableITCase.testExactlyOnceSink intermittently fails in CI with
> ProducerFencedException caused by transactional producer state leaking
> between test runs on the same Kafka broker instance.
> Observed in: CI run
> https://github.com/apache/flink-connector-kafka/actions/runs/26227821749
>
> Ideas/potential fixes to think about:
> - Use randomized/unique transactional ID prefixes per test run (similar to
> how topic names already use random UUIDs)
> - Or explicitly abort any open transactions with the same ID before the
> test starts
> - Or ensure the Kafka container is fully restarted between parameterized
> variants that share transactional ID space
--
This message was sent by Atlassian Jira
(v8.20.10#820010)