[
https://issues.apache.org/jira/browse/FLINK-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552806#comment-16552806
]
ASF GitHub Bot commented on FLINK-9846:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6387#discussion_r204390897
--- Diff:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
---
@@ -58,14 +65,23 @@ public Kafka010JsonTableSink(String topic, Properties
properties) {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use table descriptors instead of implementation-specific
classes.
*/
+ @Deprecated
public Kafka010JsonTableSink(String topic, Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic,
Properties properties, SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer010<>(topic, serializationSchema,
properties, partitioner);
+ final FlinkKafkaProducerBase<Row> kafkaProducer = new
FlinkKafkaProducer010<>(
+ topic,
+ serializationSchema,
+ properties,
+ partitioner);
+ // always enable flush on checkpoint to achieve at-least-once
if query runs with checkpointing enabled.
--- End diff --
Good point! It was explicitly set in the table source but recently changed
in FLINK-5728. Will drop this.
> Add a Kafka table sink factory
> ------------------------------
>
> Key: FLINK-9846
> URL: https://issues.apache.org/jira/browse/FLINK-9846
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> FLINK-8866 implements a unified way of creating sinks and using the format
> discovery for searching for formats (FLINK-8858). It is now possible to add a
> Kafka table sink factory for streaming environment that uses the new
> interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)