[
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16885303#comment-16885303
]
Jiangjie Qin commented on FLINK-11654:
--------------------------------------
[~knaufk] That's a fair point. There might be pros and cons in either approach.
The good thing about reusing {{transactional.id}} is that there is only one
config across the board to set {{transactional.id}} so people won't miss it.
Having a new config will be more accurate, but have one more config looking
similar to {{transactional.id}}.
I am fine either way, as long as we stick to the same principle everywhere. So
if we introduce a new config, the principle would be:
# If we are going to overwrite a native external config for any purpose
(transactional.id in this case), the original config should be mapped to a new
Flink specific config name (transactional.id.prefix in this case),
# If user provided the value of the original config, a warning should be
logged (I actually prefer an exception as most people do not look at logs) and
points to the Flink specific config in (1)
If we do that, there might be a few more configs to be taken care of, e.g.
{{acks}}, {{client.id}}, {{enable.idempotence}}
> Multiple transactional KafkaProducers writing to same cluster have clashing
> transaction IDs
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.7.1
> Reporter: Jürgen Kreileder
> Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go
> into a restart cycle.
> Example exception from the Kafka log:
>
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing
> append operation on partition finding-commands-dev-1-0
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is
> no longer valid. There is probably another producer with a newer epoch. 483
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash
> between different Jobs (and Clusters).
>
>
> {code:java}
> ---
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
> nextTransactionalIdHintState =
> context.getOperatorStateStore().getUnionListState(
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
> transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
> getRuntimeContext().getTaskName() + "-" +
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
> getRuntimeContext().getIndexOfThisSubtask(),
>
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)