[FLINK-9846] [table] Add a Kafka table sink factory

Adds a Kafka table sink factory with format discovery. Currently, this enables
the SQL Client to write Avro and JSON data to Kafka. The functionality is
limited due to FLINK-9870. Therefore, it is currently not possible
to use time attributes in the output.

Changes:
- Decouple Kafka sink from formats and deprecate old classes
- Add a Kafka table sink factory
- Existing tests for the KafkaTableSourceFactory have been
  generalized to support sinks as well.

This closes #6387.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa25b4b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa25b4b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa25b4b3

Branch: refs/heads/release-1.6
Commit: aa25b4b324b4c025fe9e58e081677faf0ab50a7d
Parents: 702f773
Author: Timo Walther <[email protected]>
Authored: Mon Jul 23 08:12:00 2018 +0200
Committer: Timo Walther <[email protected]>
Committed: Mon Jul 23 18:19:53 2018 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010JsonTableSink.java |  19 +-
 .../connectors/kafka/Kafka010TableSink.java     |  61 ++++
 .../kafka/Kafka010TableSourceFactory.java       |  72 ----
 .../kafka/Kafka010TableSourceSinkFactory.java   |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka010JsonTableSinkTest.java        |   4 +
 .../kafka/Kafka010TableSourceFactoryTest.java   |  74 -----
 .../Kafka010TableSourceSinkFactoryTest.java     |  99 ++++++
 .../connectors/kafka/Kafka011TableSink.java     |  64 ++++
 .../connectors/kafka/Kafka011TableSource.java   |   3 +-
 .../kafka/Kafka011TableSourceFactory.java       |  72 ----
 .../kafka/Kafka011TableSourceSinkFactory.java   |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka011TableSourceFactoryTest.java   |  74 -----
 .../Kafka011TableSourceSinkFactoryTest.java     |  99 ++++++
 .../connectors/kafka/Kafka08JsonTableSink.java  |  19 +-
 .../connectors/kafka/Kafka08TableSink.java      |  61 ++++
 .../connectors/kafka/Kafka08TableSource.java    |   3 +-
 .../kafka/Kafka08TableSourceFactory.java        |  72 ----
 .../kafka/Kafka08TableSourceSinkFactory.java    |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   4 +
 .../kafka/Kafka08TableSourceFactoryTest.java    |  74 -----
 .../Kafka08TableSourceSinkFactoryTest.java      |  99 ++++++
 .../connectors/kafka/Kafka09JsonTableSink.java  |  19 +-
 .../connectors/kafka/Kafka09TableSink.java      |  61 ++++
 .../connectors/kafka/Kafka09TableSource.java    |   3 +-
 .../kafka/Kafka09TableSourceFactory.java        |  72 ----
 .../kafka/Kafka09TableSourceSinkFactory.java    |  90 +++++
 ...rg.apache.flink.table.factories.TableFactory |   2 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   4 +
 .../kafka/Kafka09TableSourceFactoryTest.java    |  74 -----
 .../Kafka09TableSourceSinkFactoryTest.java      |  99 ++++++
 .../connectors/kafka/KafkaJsonTableSink.java    |   5 +
 .../connectors/kafka/KafkaTableSink.java        | 112 ++++++-
 .../kafka/KafkaTableSourceFactory.java          | 251 --------------
 .../kafka/KafkaTableSourceSinkFactoryBase.java  | 330 +++++++++++++++++++
 .../partitioner/FlinkFixedPartitioner.java      |  10 +
 .../kafka/KafkaTableSinkTestBase.java           |   7 +-
 .../kafka/KafkaTableSourceFactoryTestBase.java  | 196 -----------
 .../KafkaTableSourceSinkFactoryTestBase.java    | 299 +++++++++++++++++
 .../utils/TestSerializationSchema.scala         |  16 +-
 .../utils/TestTableFormatFactory.scala          |  18 +-
 43 files changed, 1852 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index ef33cd5..2ad3142 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -18,18 +18,23 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
+ *             with descriptors for schema and format instead. Descriptors 
allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 
        /**
@@ -46,7 +51,9 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink 
{
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka010JsonTableSink(String topic, Properties properties) {
                super(topic, properties, new FlinkFixedPartitioner<>());
        }
@@ -58,14 +65,20 @@ public class Kafka010JsonTableSink extends 
KafkaJsonTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka010JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
 
        @Override
        protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer010<>(topic, serializationSchema, 
properties, partitioner);
+               return new FlinkKafkaProducer010<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
new file mode 100644
index 0000000..a8c6553
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka010TableSink extends KafkaTableSink {
+
+       public Kafka010TableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+               super(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer010<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
deleted file mode 100644
index 4a86016..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka010TableSource}.
- */
-public class Kafka010TableSourceFactory extends KafkaTableSourceFactory {
-
-       @Override
-       protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-       }
-
-       @Override
-       protected boolean supportsKafkaTimestamps() {
-               return true;
-       }
-
-       @Override
-       protected KafkaTableSource createKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka010TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
new file mode 100644
index 0000000..0cf9499
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010TableSource}.
+ */
+public class Kafka010TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase {
+
+       @Override
+       protected String kafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+       }
+
+       @Override
+       protected boolean supportsKafkaTimestamps() {
+               return true;
+       }
+
+       @Override
+       protected KafkaTableSource createKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka010TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink createKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka010TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 21f5707..9bb0363 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index af562c6..339420c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka010JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed 
once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
deleted file mode 100644
index ff3b0b0..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka010TableSource} created by {@link 
Kafka010TableSourceFactory}.
- */
-public class Kafka010TableSourceFactoryTest extends 
KafkaTableSourceFactoryTestBase {
-
-       @Override
-       protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer010.class;
-       }
-
-       @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka010TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..cc198c9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
+ * by {@link Kafka010TableSourceSinkFactory}.
+ */
+public class Kafka010TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactoryTestBase {
+
+       @Override
+       protected String getKafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer010.class;
+       }
+
+       @Override
+       protected Class<?> getExpectedFlinkKafkaProducer() {
+               return FlinkKafkaProducer010.class;
+       }
+
+       @Override
+       protected KafkaTableSource getExpectedKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka010TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets
+               );
+       }
+
+       @Override
+       protected KafkaTableSink getExpectedKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka010TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
new file mode 100644
index 0000000..22c6da1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Kafka 0.11 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka011TableSink extends KafkaTableSink {
+
+       public Kafka011TableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+               super(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+
+       @Override
+       protected SinkFunction<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer011<>(
+                       topic,
+                       new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
+                       properties,
+                       Optional.of(partitioner));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index 85f5669..a646317 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -58,7 +58,8 @@ public class Kafka011TableSource extends KafkaTableSource {
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
                        Optional<Map<String, String>> fieldMapping,
-                       String topic, Properties properties,
+                       String topic,
+                       Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
                        StartupMode startupMode,
                        Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
deleted file mode 100644
index b1e3929..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka011TableSource}.
- */
-public class Kafka011TableSourceFactory extends KafkaTableSourceFactory {
-
-       @Override
-       protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-       }
-
-       @Override
-       protected boolean supportsKafkaTimestamps() {
-               return true;
-       }
-
-       @Override
-       protected KafkaTableSource createKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka011TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
new file mode 100644
index 0000000..c26df42
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011TableSource}.
+ */
+public class Kafka011TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase {
+
+       @Override
+       protected String kafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+       }
+
+       @Override
+       protected boolean supportsKafkaTimestamps() {
+               return true;
+       }
+
+       @Override
+       protected KafkaTableSource createKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka011TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink createKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka011TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c056097..b59b4a7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
deleted file mode 100644
index abaa490..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka011TableSource} created by {@link 
Kafka011TableSourceFactory}.
- */
-public class Kafka011TableSourceFactoryTest extends 
KafkaTableSourceFactoryTestBase {
-
-       @Override
-       protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer011.class;
-       }
-
-       @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka011TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..996c508
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka011TableSource} and {@link Kafka011TableSink} created
+ * by {@link Kafka011TableSourceSinkFactory}.
+ */
+public class Kafka011TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactoryTestBase {
+
+       @Override
+       protected String getKafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer011.class;
+       }
+
+       @Override
+       protected Class<?> getExpectedFlinkKafkaProducer() {
+               return FlinkKafkaProducer011.class;
+       }
+
+       @Override
+       protected KafkaTableSource getExpectedKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka011TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets
+               );
+       }
+
+       @Override
+       protected KafkaTableSink getExpectedKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka011TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index c60288d..45588cd 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
+ *             with descriptors for schema and format instead. Descriptors 
allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
        /**
@@ -48,7 +53,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka08JsonTableSink(String topic, Properties properties) {
                super(topic, properties, new FlinkFixedPartitioner<>());
        }
@@ -60,7 +67,9 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka08JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
@@ -84,7 +93,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink 
{
 
        @Override
        protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer08<>(topic, serializationSchema, 
properties, partitioner);
+               return new FlinkKafkaProducer08<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
new file mode 100644
index 0000000..c34de13
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.8 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka08TableSink extends KafkaTableSink {
+
+       public Kafka08TableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+               super(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer08<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 1a025b8..97c293e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -58,7 +58,8 @@ public class Kafka08TableSource extends KafkaTableSource {
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
                        Optional<Map<String, String>> fieldMapping,
-                       String topic, Properties properties,
+                       String topic,
+                       Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
                        StartupMode startupMode,
                        Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
deleted file mode 100644
index cd33751..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka08TableSource}.
- */
-public class Kafka08TableSourceFactory extends KafkaTableSourceFactory {
-
-       @Override
-       protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-       }
-
-       @Override
-       protected boolean supportsKafkaTimestamps() {
-               return false;
-       }
-
-       @Override
-       protected KafkaTableSource createKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka08TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
new file mode 100644
index 0000000..3e93b6f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08TableSource}.
+ */
+public class Kafka08TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase {
+
+       @Override
+       protected String kafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+       }
+
+       @Override
+       protected boolean supportsKafkaTimestamps() {
+               return false;
+       }
+
+       @Override
+       protected KafkaTableSource createKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka08TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink createKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka08TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b83bb3f..f2e1c3f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 53da9f6..32bd3b6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka08JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed 
once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
deleted file mode 100644
index d939d88..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka08TableSource} created by {@link 
Kafka08TableSourceFactory}.
- */
-public class Kafka08TableSourceFactoryTest extends 
KafkaTableSourceFactoryTestBase {
-
-       @Override
-       protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer08.class;
-       }
-
-       @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka08TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..b67501e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka08TableSource} and {@link Kafka08TableSink} created
+ * by {@link Kafka08TableSourceSinkFactory}.
+ */
+public class Kafka08TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactoryTestBase {
+
+       @Override
+       protected String getKafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer08.class;
+       }
+
+       @Override
+       protected Class<?> getExpectedFlinkKafkaProducer() {
+               return FlinkKafkaProducer08.class;
+       }
+
+       @Override
+       protected KafkaTableSource getExpectedKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka08TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets
+               );
+       }
+
+       @Override
+       protected KafkaTableSink getExpectedKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka08TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 95ce4e6..b3cc0aa 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
 /**
  * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
+ *             with descriptors for schema and format instead. Descriptors 
allow for
+ *             implementation-agnostic definition of tables. See also
+ *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
  */
-@PublicEvolving
+@Deprecated
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
        /**
@@ -48,7 +53,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka09JsonTableSink(String topic, Properties properties) {
                super(topic, properties, new FlinkFixedPartitioner<>());
        }
@@ -60,7 +67,9 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public Kafka09JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
@@ -84,7 +93,11 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink 
{
 
        @Override
        protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
+               return new FlinkKafkaProducer09<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
new file mode 100644
index 0000000..8c349d7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.9 table sink for writing data into Kafka.
+ */
+@Internal
+public class Kafka09TableSink extends KafkaTableSink {
+
+       public Kafka09TableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+               super(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer09<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 18bc1c4..8f9e799 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -58,7 +58,8 @@ public class Kafka09TableSource extends KafkaTableSource {
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
                        Optional<Map<String, String>> fieldMapping,
-                       String topic, Properties properties,
+                       String topic,
+                       Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
                        StartupMode startupMode,
                        Map<KafkaTopicPartition, Long> specificStartupOffsets) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
deleted file mode 100644
index 14c52fd..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Factory for creating configured instances of {@link Kafka09TableSource}.
- */
-public class Kafka09TableSourceFactory extends KafkaTableSourceFactory {
-
-       @Override
-       protected String kafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
-       }
-
-       @Override
-       protected boolean supportsKafkaTimestamps() {
-               return false;
-       }
-
-       @Override
-       protected KafkaTableSource createKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka09TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
new file mode 100644
index 0000000..9958b4e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09TableSource}.
+ */
+public class Kafka09TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBase {
+
+       @Override
+       protected String kafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+       }
+
+       @Override
+       protected boolean supportsKafkaTimestamps() {
+               return false;
+       }
+
+       @Override
+       protected KafkaTableSource createKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka09TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets);
+       }
+
+       @Override
+       protected KafkaTableSink createKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka09TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index fb14ddb..2625873 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory
+org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 610e048..79f251b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -27,7 +27,11 @@ import java.util.Properties;
 
 /**
  * Tests for the {@link Kafka09JsonTableSink}.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed 
once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
        @Override

Reply via email to