[
https://issues.apache.org/jira/browse/FLINK-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552640#comment-16552640
]
ASF GitHub Bot commented on FLINK-9846:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6387#discussion_r204346083
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 0.11 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka011TableSink extends KafkaTableSink {
+
+ /**
+ * Creates a Kafka 0.11 table sink.
+ *
+ * @param schema The schema of the table.
+ * @param topic Kafka topic to write to.
+ * @param properties Properties for the Kafka producer.
+ * @param partitioner Partitioner to select Kafka partition for
each item.
+ * @param serializationSchema Serialization schema for encoding records
to Kafka.
+ */
+ public Kafka011TableSink(
+ TableSchema schema,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
+ SerializationSchema<Row> serializationSchema) {
+ super(
+ schema,
+ topic,
+ properties,
+ partitioner,
+ serializationSchema);
+ }
+
+ @Override
+ protected SinkFunction<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer011<>(
--- End diff --
here on the other hand you are relaying on default value for
`at-least-once`.
> Add a Kafka table sink factory
> ------------------------------
>
> Key: FLINK-9846
> URL: https://issues.apache.org/jira/browse/FLINK-9846
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> FLINK-8866 implements a unified way of creating sinks and using the format
> discovery for searching for formats (FLINK-8858). It is now possible to add a
> Kafka table sink factory for streaming environment that uses the new
> interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)