[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147741#comment-17147741
 ] 

Shengkai Fang commented on FLINK-14719:
---------------------------------------

In release 1.11, we still can't set `semantic` in configuration. Maybe we can 
add this feature for release 1.12. 

Although this is a trival work, I think it's necessary for users who care about 
`EXACTLY_ONCE`.

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14719
>                 URL: https://issues.apache.org/jira/browse/FLINK-14719
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: chaiyongqiang
>            Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
>               String topic,
>               Properties properties,
>               SerializationSchema<Row> serializationSchema,
>               Optional<FlinkKafkaPartitioner<Row>> partitioner) {
>               return new FlinkKafkaProducer<>(
>                       topic,
>                       new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>                       properties,
>                       partitioner);
>       }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>       public FlinkKafkaProducer(
>               String defaultTopicId,
>               KeyedSerializationSchema<IN> serializationSchema,
>               Properties producerConfig,
>               Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
>               this(
>                       defaultTopicId,
>                       serializationSchema,
>                       producerConfig,
>                       customPartitioner,
>                       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
>                       DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>       }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to