[
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884267#comment-16884267
]
Jiangjie Qin commented on FLINK-11654:
--------------------------------------
[~knaufk] That makes sense. In fact the {{transactional.id}} config is already
supported by Kafka producers, we can simply use add that to the current prefix.
This config can be set via {{producerProperties}} config. There are still 1 or
2 constructors of the {{FlinkKafkaProducer}} that do not take
{{producerProperties}}. We can keep the behavior as is in that case.
If we do that, the behavior would be:
# When user did not provide \{{producerProperties}} or did not specify
\{{transactional.id}} in the \{{producerProperties}}, the behavior will be the
same as current behavior.
# When user provided a {{transactional.id}} in the {{producerProperties}},
that \{{transactional.id}} will be part of the prefix. If the user provided
{{transactional.id}} config is unique, the eventually used transactional.id
will also be unique.
I think this is a good approach. It is fully backwards compatible and do not
introduce any new API.
> Multiple transactional KafkaProducers writing to same cluster have clashing
> transaction IDs
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.7.1
> Reporter: Jürgen Kreileder
> Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go
> into a restart cycle.
> Example exception from the Kafka log:
>
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing
> append operation on partition finding-commands-dev-1-0
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is
> no longer valid. There is probably another producer with a newer epoch. 483
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash
> between different Jobs (and Clusters).
>
>
> {code:java}
> ---
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
> nextTransactionalIdHintState =
> context.getOperatorStateStore().getUnionListState(
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
> transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
> getRuntimeContext().getTaskName() + "-" +
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
> getRuntimeContext().getIndexOfThisSubtask(),
>
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)