[
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-14719.
---------------------------
Resolution: Not A Problem
> Making Semantic configurable in Flinkkafkaproducer to support exactly-once
> semantic in Table API
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-14719
> URL: https://issues.apache.org/jira/browse/FLINK-14719
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.8.0
> Reporter: chaiyongqiang
> Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize
> exactly once semantic . But when we use Table API, things are different.
> The createKafkaProducer method in KafkaTableSink is used to create
> FlinkKafkaProducer to sending messages to Kafka server. It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
> String topic,
> Properties properties,
> SerializationSchema<Row> serializationSchema,
> Optional<FlinkKafkaPartitioner<Row>> partitioner) {
> return new FlinkKafkaProducer<>(
> topic,
> new
> KeyedSerializationSchemaWrapper<>(serializationSchema),
> properties,
> partitioner);
> }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will
> lead to an at_least_once semantic producer :
> {code:java}
> public FlinkKafkaProducer(
> String defaultTopicId,
> KeyedSerializationSchema<IN> serializationSchema,
> Properties producerConfig,
> Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
> this(
> defaultTopicId,
> serializationSchema,
> producerConfig,
> customPartitioner,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)