Repository: flink
Updated Branches:
  refs/heads/release-1.6 702f77355 -> aa25b4b32


http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
deleted file mode 100644
index b976e14..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * Test for {@link Kafka09TableSource} created by {@link 
Kafka09TableSourceFactory}.
- */
-public class Kafka09TableSourceFactoryTest extends 
KafkaTableSourceFactoryTestBase {
-
-       @Override
-       protected String getKafkaVersion() {
-               return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer09.class;
-       }
-
-       @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
-                       TableSchema schema,
-                       Optional<String> proctimeAttribute,
-                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
-                       Map<String, String> fieldMapping,
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
-
-               return new Kafka09TableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributeDescriptors,
-                       Optional.of(fieldMapping),
-                       topic,
-                       properties,
-                       deserializationSchema,
-                       startupMode,
-                       specificStartupOffsets
-               );
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..a6c8bd4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Test for {@link Kafka09TableSource} and {@link Kafka09TableSink} created
+ * by {@link Kafka09TableSourceSinkFactory}.
+ */
+public class Kafka09TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFactoryTestBase {
+
+       @Override
+       protected String getKafkaVersion() {
+               return KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer09.class;
+       }
+
+       @Override
+       protected Class<?> getExpectedFlinkKafkaProducer() {
+               return FlinkKafkaProducer09.class;
+       }
+
+       @Override
+       protected KafkaTableSource getExpectedKafkaTableSource(
+                       TableSchema schema,
+                       Optional<String> proctimeAttribute,
+                       List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
+                       Map<String, String> fieldMapping,
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+
+               return new Kafka09TableSource(
+                       schema,
+                       proctimeAttribute,
+                       rowtimeAttributeDescriptors,
+                       Optional.of(fieldMapping),
+                       topic,
+                       properties,
+                       deserializationSchema,
+                       startupMode,
+                       specificStartupOffsets
+               );
+       }
+
+       @Override
+       protected KafkaTableSink getExpectedKafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+
+               return new Kafka09TableSink(
+                       schema,
+                       topic,
+                       properties,
+                       partitioner,
+                       serializationSchema
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index ec27398..231eddd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -29,7 +29,10 @@ import java.util.Properties;
 
 /**
  * Base class for {@link KafkaTableSink} that serializes data in JSON format.
+ *
+ * @deprecated Use table descriptors instead of implementation-specific 
classes.
  */
+@Deprecated
 @Internal
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
 
@@ -39,7 +42,9 @@ public abstract class KafkaJsonTableSink extends 
KafkaTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public KafkaJsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 687df58..7853bb7 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -23,12 +23,17 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.util.TableConnectorUtil;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -40,27 +45,59 @@ import java.util.Properties;
 @Internal
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
+       // TODO make all attributes final and mandatory once we drop support 
for format-specific table sinks
+
+       /** The schema of the table. */
+       private final Optional<TableSchema> schema;
+
+       /** The Kafka topic to write to. */
        protected final String topic;
+
+       /** Properties for the Kafka producer. */
        protected final Properties properties;
-       protected SerializationSchema<Row> serializationSchema;
+
+       /** Serialization schema for encoding records to Kafka. */
+       protected Optional<SerializationSchema<Row>> serializationSchema;
+
+       /** Partitioner to select Kafka partition for each item. */
        protected final FlinkKafkaPartitioner<Row> partitioner;
+
+       // legacy variables
        protected String[] fieldNames;
        protected TypeInformation[] fieldTypes;
 
+       protected KafkaTableSink(
+                       TableSchema schema,
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
+                       SerializationSchema<Row> serializationSchema) {
+               this.schema = Optional.of(Preconditions.checkNotNull(schema, 
"Schema must not be null."));
+               this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+               this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
+               this.serializationSchema = 
Optional.of(Preconditions.checkNotNull(
+                       serializationSchema, "Serialization schema must not be 
null."));
+       }
+
        /**
         * Creates KafkaTableSink.
         *
         * @param topic                 Kafka topic to write to.
-        * @param properties            Properties for the Kafka consumer.
+        * @param properties            Properties for the Kafka producer.
         * @param partitioner           Partitioner to select Kafka partition 
for each item
+        * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
+       @Deprecated
        public KafkaTableSink(
                        String topic,
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
+               this.schema = Optional.empty();
                this.topic = Preconditions.checkNotNull(topic, "topic");
                this.properties = Preconditions.checkNotNull(properties, 
"properties");
                this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
+               this.serializationSchema = Optional.empty();
        }
 
        /**
@@ -72,8 +109,9 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @param partitioner         Partitioner to select Kafka partition.
         * @return The version-specific Kafka producer
         */
-       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-               String topic, Properties properties,
+       protected abstract SinkFunction<Row> createKafkaProducer(
+               String topic,
+               Properties properties,
                SerializationSchema<Row> serializationSchema,
                FlinkKafkaPartitioner<Row> partitioner);
 
@@ -82,40 +120,57 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         *
         * @param rowSchema the schema of the row to serialize.
         * @return Instance of serialization schema
+        * @deprecated Use the constructor to pass a serialization schema 
instead.
         */
-       protected abstract SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema);
+       @Deprecated
+       protected SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema) {
+               throw new UnsupportedOperationException("This method only 
exists for backwards compatibility.");
+       }
 
        /**
         * Create a deep copy of this sink.
         *
         * @return Deep copy of this sink
         */
-       protected abstract KafkaTableSink createCopy();
+       @Deprecated
+       protected KafkaTableSink createCopy() {
+               throw new UnsupportedOperationException("This method only 
exists for backwards compatibility.");
+       }
 
        @Override
        public void emitDataStream(DataStream<Row> dataStream) {
-               FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
-               // always enable flush on checkpoint to achieve at-least-once 
if query runs with checkpointing enabled.
-               kafkaProducer.setFlushOnCheckpoint(true);
+               SinkFunction<Row> kafkaProducer = createKafkaProducer(
+                       topic,
+                       properties,
+                       serializationSchema.orElseThrow(() -> new 
IllegalStateException("No serialization schema defined.")),
+                       partitioner);
                
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(),
 fieldNames));
        }
 
        @Override
        public TypeInformation<Row> getOutputType() {
-               return new RowTypeInfo(getFieldTypes());
+               return schema
+                       .map(TableSchema::toRowType)
+                       .orElseGet(() -> new RowTypeInfo(getFieldTypes()));
        }
 
        public String[] getFieldNames() {
-               return fieldNames;
+               return 
schema.map(TableSchema::getColumnNames).orElse(fieldNames);
        }
 
        @Override
        public TypeInformation<?>[] getFieldTypes() {
-               return fieldTypes;
+               return schema.map(TableSchema::getTypes).orElse(fieldTypes);
        }
 
        @Override
        public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               if (schema.isPresent()) {
+                       // a fixed schema is defined so reconfiguration is not 
supported
+                       throw new 
UnsupportedOperationException("Reconfiguration of this sink is not supported.");
+               }
+
+               // legacy code
                KafkaTableSink copy = createCopy();
                copy.fieldNames = Preconditions.checkNotNull(fieldNames, 
"fieldNames");
                copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
@@ -123,8 +178,39 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
                        "Number of provided field names and types does not 
match.");
 
                RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
-               copy.serializationSchema = createSerializationSchema(rowSchema);
+               copy.serializationSchema = 
Optional.of(createSerializationSchema(rowSchema));
 
                return copy;
        }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               KafkaTableSink that = (KafkaTableSink) o;
+               return Objects.equals(schema, that.schema) &&
+                       Objects.equals(topic, that.topic) &&
+                       Objects.equals(properties, that.properties) &&
+                       Objects.equals(serializationSchema, 
that.serializationSchema) &&
+                       Objects.equals(partitioner, that.partitioner) &&
+                       Arrays.equals(fieldNames, that.fieldNames) &&
+                       Arrays.equals(fieldTypes, that.fieldTypes);
+       }
+
+       @Override
+       public int hashCode() {
+               int result = Objects.hash(
+                       schema,
+                       topic,
+                       properties,
+                       serializationSchema,
+                       partitioner);
+               result = 31 * result + Arrays.hashCode(fieldNames);
+               result = 31 * result + Arrays.hashCode(fieldTypes);
+               return result;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
deleted file mode 100644
index d7e42f5..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.KafkaValidator;
-import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
-import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-
-/**
- * Factory for creating configured instances of {@link KafkaTableSource}.
- */
-public abstract class KafkaTableSourceFactory implements 
StreamTableSourceFactory<Row> {
-
-       @Override
-       public Map<String, String> requiredContext() {
-               Map<String, String> context = new HashMap<>();
-               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // 
append mode
-               context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // 
kafka
-               context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
-               context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards 
compatibility
-               return context;
-       }
-
-       @Override
-       public List<String> supportedProperties() {
-               List<String> properties = new ArrayList<>();
-
-               // kafka
-               properties.add(CONNECTOR_TOPIC);
-               properties.add(CONNECTOR_PROPERTIES);
-               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_KEY);
-               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_VALUE);
-               properties.add(CONNECTOR_STARTUP_MODE);
-               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
-               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
-
-               // schema
-               properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
-               properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
-               properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
-
-               // time attributes
-               properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
-               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
-               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
-               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
-               properties.add(SCHEMA() + ".#." + 
ROWTIME_TIMESTAMPS_SERIALIZED());
-               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
-               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
-               properties.add(SCHEMA() + ".#." + 
ROWTIME_WATERMARKS_SERIALIZED());
-               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
-
-               // format wildcard
-               properties.add(FORMAT() + ".*");
-
-               return properties;
-       }
-
-       @Override
-       public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
-               final DescriptorProperties params = new 
DescriptorProperties(true);
-               params.putProperties(properties);
-
-               // validate
-               // allow Kafka timestamps to be used, watermarks can not be 
received from source
-               new SchemaValidator(true, supportsKafkaTimestamps(), 
false).validate(params);
-               new KafkaValidator().validate(params);
-
-               // deserialization schema using format discovery
-               final DeserializationSchemaFactory<?> formatFactory = 
TableFactoryService.find(
-                       DeserializationSchemaFactory.class,
-                       properties,
-                       this.getClass().getClassLoader());
-               @SuppressWarnings("unchecked")
-               final DeserializationSchema<Row> deserializationSchema = 
(DeserializationSchema<Row>) formatFactory
-                       .createDeserializationSchema(properties);
-
-               // schema
-               final TableSchema schema = params.getTableSchema(SCHEMA());
-
-               // proctime
-               final Optional<String> proctimeAttribute = 
SchemaValidator.deriveProctimeAttribute(params);
-
-               // rowtime
-               final List<RowtimeAttributeDescriptor> rowtimeAttributes = 
SchemaValidator.deriveRowtimeAttributes(params);
-
-               // field mapping
-               final Map<String, String> fieldMapping = 
SchemaValidator.deriveFieldMapping(params, Optional.of(schema));
-
-               // properties
-               final Properties kafkaProperties = new Properties();
-               final List<Map<String, String>> propsList = 
params.getFixedIndexedProperties(
-                       CONNECTOR_PROPERTIES,
-                       Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE));
-               propsList.forEach(kv -> kafkaProperties.put(
-                       params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
-                       params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
-               ));
-
-               // topic
-               final String topic = params.getString(CONNECTOR_TOPIC);
-
-               // startup mode
-               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
-               final StartupMode startupMode = params
-                       .getOptionalString(CONNECTOR_STARTUP_MODE)
-                       .map(modeString -> {
-                               switch (modeString) {
-                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
-                                               return StartupMode.EARLIEST;
-
-                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
-                                               return StartupMode.LATEST;
-
-                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
-                                               return 
StartupMode.GROUP_OFFSETS;
-
-                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
-                                               final List<Map<String, String>> 
offsetList = params.getFixedIndexedProperties(
-                                                       
CONNECTOR_SPECIFIC_OFFSETS,
-                                                       
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-                                               offsetList.forEach(kv -> {
-                                                       final int partition = 
params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
-                                                       final long offset = 
params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
-                                                       final 
KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
-                                                       
specificOffsets.put(topicPartition, offset);
-                                               });
-                                               return 
StartupMode.SPECIFIC_OFFSETS;
-                                       default:
-                                               throw new 
TableException("Unsupported startup mode. Validator should have checked that.");
-                               }
-                       }).orElse(StartupMode.GROUP_OFFSETS);
-
-               return createKafkaTableSource(
-                       schema,
-                       proctimeAttribute,
-                       rowtimeAttributes,
-                       fieldMapping,
-                       topic,
-                       kafkaProperties,
-                       deserializationSchema,
-                       startupMode,
-                       specificOffsets);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // For version-specific factories
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Returns the Kafka version.
-        */
-       protected abstract String kafkaVersion();
-
-       /**
-        * True if the Kafka source supports Kafka timestamps, false otherwise.
-        *
-        * @return True if the Kafka source supports Kafka timestamps, false 
otherwise.
-        */
-       protected abstract boolean supportsKafkaTimestamps();
-
-       /**
-        * Constructs the version-specific Kafka table source.
-        *
-        * @param schema                      Schema of the produced table.
-        * @param proctimeAttribute           Field name of the processing time 
attribute.
-        * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
-        * @param fieldMapping                Mapping for the fields of the 
table schema to
-        *                                    fields of the physical returned 
type.
-        * @param topic                       Kafka topic to consume.
-        * @param properties                  Properties for the Kafka consumer.
-        * @param deserializationSchema       Deserialization schema for 
decoding records from Kafka.
-        * @param startupMode                 Startup mode for the contained 
consumer.
-        * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
-        *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
-        */
-       protected abstract KafkaTableSource createKafkaTableSource(
-               TableSchema schema,
-               Optional<String> proctimeAttribute,
-               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-               Map<String, String> fieldMapping,
-               String topic, Properties properties,
-               DeserializationSchema<Row> deserializationSchema,
-               StartupMode startupMode,
-               Map<KafkaTopicPartition, Long> specificStartupOffsets);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
new file mode 100644
index 0000000..3307994
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+
+/**
+ * Factory for creating configured instances of {@link KafkaTableSource}.
+ */
+public abstract class KafkaTableSourceSinkFactoryBase implements
+               StreamTableSourceFactory<Row>,
+               StreamTableSinkFactory<Row> {
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // 
append mode
+               context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // 
kafka
+               context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
+               context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards 
compatibility
+               return context;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               List<String> properties = new ArrayList<>();
+
+               // kafka
+               properties.add(CONNECTOR_TOPIC);
+               properties.add(CONNECTOR_PROPERTIES);
+               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_KEY);
+               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_VALUE);
+               properties.add(CONNECTOR_STARTUP_MODE);
+               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+
+               // schema
+               properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+               properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+               properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
+
+               // time attributes
+               properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+               properties.add(SCHEMA() + ".#." + 
ROWTIME_TIMESTAMPS_SERIALIZED());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+               properties.add(SCHEMA() + ".#." + 
ROWTIME_WATERMARKS_SERIALIZED());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
+
+               // format wildcard
+               properties.add(FORMAT() + ".*");
+
+               return properties;
+       }
+
+       @Override
+       public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
+               final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
+
+               final TableSchema schema = 
descriptorProperties.getTableSchema(SCHEMA());
+               final String topic = 
descriptorProperties.getString(CONNECTOR_TOPIC);
+               final StartupOptions startupOptions = 
getStartupOptions(descriptorProperties, topic);
+
+               return createKafkaTableSource(
+                       schema,
+                       
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
+                       
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
+                       
SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+                       topic,
+                       getKafkaProperties(descriptorProperties),
+                       getDeserializationSchema(properties),
+                       startupOptions.startupMode,
+                       startupOptions.specificOffsets);
+       }
+
+       @Override
+       public StreamTableSink<Row> createStreamTableSink(Map<String, String> 
properties) {
+               final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
+
+               final TableSchema schema = 
descriptorProperties.getTableSchema(SCHEMA());
+               final String topic = 
descriptorProperties.getString(CONNECTOR_TOPIC);
+               final Optional<String> proctime = 
SchemaValidator.deriveProctimeAttribute(descriptorProperties);
+               final List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors =
+                       
SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
+
+               // see also FLINK-9870
+               if (proctime.isPresent() || 
!rowtimeAttributeDescriptors.isEmpty() ||
+                               
checkForCustomFieldMapping(descriptorProperties, schema)) {
+                       throw new TableException("Time attributes and custom 
field mappings are not supported yet.");
+               }
+
+               return createKafkaTableSink(
+                       schema,
+                       topic,
+                       getKafkaProperties(descriptorProperties),
+                       getFlinkKafkaPartitioner(),
+                       getSerializationSchema(properties));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // For version-specific factories
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the Kafka version.
+        */
+       protected abstract String kafkaVersion();
+
+       /**
+        * True if the Kafka source supports Kafka timestamps, false otherwise.
+        *
+        * @return True if the Kafka source supports Kafka timestamps, false 
otherwise.
+        */
+       protected abstract boolean supportsKafkaTimestamps();
+
+       /**
+        * Constructs the version-specific Kafka table source.
+        *
+        * @param schema                      Schema of the produced table.
+        * @param proctimeAttribute           Field name of the processing time 
attribute.
+        * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
+        * @param fieldMapping                Mapping for the fields of the 
table schema to
+        *                                    fields of the physical returned 
type.
+        * @param topic                       Kafka topic to consume.
+        * @param properties                  Properties for the Kafka consumer.
+        * @param deserializationSchema       Deserialization schema for 
decoding records from Kafka.
+        * @param startupMode                 Startup mode for the contained 
consumer.
+        * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
+        *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
+        */
+       protected abstract KafkaTableSource createKafkaTableSource(
+               TableSchema schema,
+               Optional<String> proctimeAttribute,
+               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+               Map<String, String> fieldMapping,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+       /**
+        * Constructs the version-specific Kafka table sink.
+        *
+        * @param schema      Schema of the produced table.
+        * @param topic       Kafka topic to consume.
+        * @param properties  Properties for the Kafka consumer.
+        * @param partitioner Partitioner to select Kafka partition for each 
item.
+        */
+       protected abstract KafkaTableSink createKafkaTableSink(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               FlinkKafkaPartitioner<Row> partitioner,
+               SerializationSchema<Row> serializationSchema);
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private DescriptorProperties getValidatedProperties(Map<String, String> 
properties) {
+               final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+               descriptorProperties.putProperties(properties);
+
+               // allow Kafka timestamps to be used, watermarks can not be 
received from source
+               new SchemaValidator(true, supportsKafkaTimestamps(), 
false).validate(descriptorProperties);
+               new KafkaValidator().validate(descriptorProperties);
+
+               return descriptorProperties;
+       }
+
+       private DeserializationSchema<Row> getDeserializationSchema(Map<String, 
String> properties) {
+               @SuppressWarnings("unchecked")
+               final DeserializationSchemaFactory<Row> formatFactory = 
TableFactoryService.find(
+                       DeserializationSchemaFactory.class,
+                       properties,
+                       this.getClass().getClassLoader());
+               return formatFactory.createDeserializationSchema(properties);
+       }
+
+       private SerializationSchema<Row> getSerializationSchema(Map<String, 
String> properties) {
+               @SuppressWarnings("unchecked")
+               final SerializationSchemaFactory<Row> formatFactory = 
TableFactoryService.find(
+                       SerializationSchemaFactory.class,
+                       properties,
+                       this.getClass().getClassLoader());
+               return formatFactory.createSerializationSchema(properties);
+       }
+
+       private Properties getKafkaProperties(DescriptorProperties 
descriptorProperties) {
+               final Properties kafkaProperties = new Properties();
+               final List<Map<String, String>> propsList = 
descriptorProperties.getFixedIndexedProperties(
+                       CONNECTOR_PROPERTIES,
+                       Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE));
+               propsList.forEach(kv -> kafkaProperties.put(
+                       
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+                       
descriptorProperties.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+               ));
+               return kafkaProperties;
+       }
+
+       private StartupOptions getStartupOptions(
+                       DescriptorProperties descriptorProperties,
+                       String topic) {
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               final StartupMode startupMode = descriptorProperties
+                       .getOptionalString(CONNECTOR_STARTUP_MODE)
+                       .map(modeString -> {
+                               switch (modeString) {
+                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
+                                               return StartupMode.EARLIEST;
+
+                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
+                                               return StartupMode.LATEST;
+
+                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+                                               return 
StartupMode.GROUP_OFFSETS;
+
+                                       case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+                                               final List<Map<String, String>> 
offsetList = descriptorProperties.getFixedIndexedProperties(
+                                                       
CONNECTOR_SPECIFIC_OFFSETS,
+                                                       
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+                                               offsetList.forEach(kv -> {
+                                                       final int partition = 
descriptorProperties.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+                                                       final long offset = 
descriptorProperties.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+                                                       final 
KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+                                                       
specificOffsets.put(topicPartition, offset);
+                                               });
+                                               return 
StartupMode.SPECIFIC_OFFSETS;
+                                       default:
+                                               throw new 
TableException("Unsupported startup mode. Validator should have checked that.");
+                               }
+                       }).orElse(StartupMode.GROUP_OFFSETS);
+               final StartupOptions options = new StartupOptions();
+               options.startupMode = startupMode;
+               options.specificOffsets = specificOffsets;
+               return options;
+       }
+
+       private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
+               // we don't support custom partitioner so far
+               return new FlinkFixedPartitioner<>();
+       }
+
+       private boolean checkForCustomFieldMapping(DescriptorProperties 
descriptorProperties, TableSchema schema) {
+               final Map<String, String> fieldMapping = 
SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+               return fieldMapping.size() != schema.getColumnNames().length ||
+                       !fieldMapping.entrySet().stream().allMatch(mapping -> 
mapping.getKey().equals(mapping.getValue()));
+       }
+
+       private static class StartupOptions {
+               private StartupMode startupMode;
+               private Map<KafkaTopicPartition, Long> specificOffsets;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index 6e83ddd..7e0d1fb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -74,4 +74,14 @@ public class FlinkFixedPartitioner<T> extends 
FlinkKafkaPartitioner<T> {
 
                return partitions[parallelInstanceId % partitions.length];
        }
+
+       @Override
+       public boolean equals(Object o) {
+               return this == o || o instanceof FlinkFixedPartitioner;
+       }
+
+       @Override
+       public int hashCode() {
+               return FlinkFixedPartitioner.class.hashCode();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index a87c622..946b6eb5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -44,7 +44,11 @@ import static org.mockito.Mockito.when;
 
 /**
  * Abstract test base for all Kafka table sink tests.
+ *
+ * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed 
once we
+ *             drop support for format-specific table sinks.
  */
+@Deprecated
 public abstract class KafkaTableSinkTestBase {
 
        private static final String TOPIC = "testTopic";
@@ -94,7 +98,8 @@ public abstract class KafkaTableSinkTestBase {
        protected abstract Class<? extends FlinkKafkaProducerBase> 
getProducerClass();
 
        private KafkaTableSink createTableSink() {
-               return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
+               KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, 
PARTITIONER);
+               return sink.configure(FIELD_NAMES, FIELD_TYPES);
        }
 
        private static Properties createSinkProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
deleted file mode 100644
index 96f1607..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.Kafka;
-import org.apache.flink.table.descriptors.Rowtime;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.descriptors.TestTableDescriptor;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.factories.utils.TestDeserializationSchema;
-import org.apache.flink.table.factories.utils.TestTableFormat;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.tsextractors.ExistingField;
-import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test base for {@link KafkaTableSourceFactory}.
- */
-public abstract class KafkaTableSourceFactoryTestBase extends TestLogger {
-
-       private static final String TOPIC = "myTopic";
-       private static final int PARTITION_0 = 0;
-       private static final long OFFSET_0 = 100L;
-       private static final int PARTITION_1 = 1;
-       private static final long OFFSET_1 = 123L;
-       private static final String FRUIT_NAME = "fruit-name";
-       private static final String NAME = "name";
-       private static final String COUNT = "count";
-       private static final String TIME = "time";
-       private static final String EVENT_TIME = "event-time";
-       private static final String PROC_TIME = "proc-time";
-       private static final Properties KAFKA_PROPERTIES = new Properties();
-       static {
-               KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
-               KAFKA_PROPERTIES.setProperty("group.id", "dummy");
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTableSource() {
-
-               // prepare parameters for Kafka table source
-
-               final TableSchema schema = TableSchema.builder()
-                       .field(FRUIT_NAME, Types.STRING())
-                       .field(COUNT, Types.DECIMAL())
-                       .field(EVENT_TIME, Types.SQL_TIMESTAMP())
-                       .field(PROC_TIME, Types.SQL_TIMESTAMP())
-                       .build();
-
-               final List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors = Collections.singletonList(
-                       new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
-
-               final Map<String, String> fieldMapping = new HashMap<>();
-               fieldMapping.put(FRUIT_NAME, NAME);
-               fieldMapping.put(COUNT, COUNT);
-
-               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
-               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
-               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
-
-               final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
-                       TableSchema.builder()
-                               .field(NAME, Types.STRING())
-                               .field(COUNT, Types.DECIMAL())
-                               .field(TIME, Types.SQL_TIMESTAMP())
-                               .build()
-                               .toRowType()
-               );
-
-               final StartupMode startupMode = StartupMode.SPECIFIC_OFFSETS;
-
-               final KafkaTableSource expected = getExpectedKafkaTableSource(
-                       schema,
-                       Optional.of(PROC_TIME),
-                       rowtimeAttributeDescriptors,
-                       fieldMapping,
-                       TOPIC,
-                       KAFKA_PROPERTIES,
-                       deserializationSchema,
-                       startupMode,
-                       specificOffsets);
-
-               // construct table source using descriptors and table source 
factory
-
-               final Map<Integer, Long> offsets = new HashMap<>();
-               offsets.put(PARTITION_0, OFFSET_0);
-               offsets.put(PARTITION_1, OFFSET_1);
-
-               final TestTableDescriptor testDesc = new TestTableDescriptor(
-                               new Kafka()
-                                       .version(getKafkaVersion())
-                                       .topic(TOPIC)
-                                       .properties(KAFKA_PROPERTIES)
-                                       .startFromSpecificOffsets(offsets))
-                       .withFormat(new TestTableFormat())
-                       .withSchema(
-                               new Schema()
-                                       .field(FRUIT_NAME, 
Types.STRING()).from(NAME)
-                                       .field(COUNT, Types.DECIMAL()) // no 
from so it must match with the input
-                                       .field(EVENT_TIME, 
Types.SQL_TIMESTAMP()).rowtime(
-                                               new 
Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
-                                       .field(PROC_TIME, 
Types.SQL_TIMESTAMP()).proctime())
-                       .inAppendMode();
-               final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-               testDesc.addProperties(descriptorProperties);
-               final Map<String, String> propertiesMap = 
descriptorProperties.asMap();
-
-               final TableSource<?> actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, testDesc)
-                       .createStreamTableSource(propertiesMap);
-
-               assertEquals(expected, actualSource);
-
-               // test Kafka consumer
-               final KafkaTableSource actualKafkaSource = (KafkaTableSource) 
actualSource;
-               final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
-               actualKafkaSource.getDataStream(mock);
-               
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.function.getClass()));
-       }
-
-       private static class StreamExecutionEnvironmentMock extends 
StreamExecutionEnvironment {
-
-               public SourceFunction<?> function;
-
-               @Override
-               public <OUT> DataStreamSource<OUT> 
addSource(SourceFunction<OUT> function) {
-                       this.function = function;
-                       return super.addSource(function);
-               }
-
-               @Override
-               public JobExecutionResult execute(String jobName) {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // For version-specific tests
-       // 
--------------------------------------------------------------------------------------------
-
-       protected abstract String getKafkaVersion();
-
-       protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer();
-
-       protected abstract KafkaTableSource getExpectedKafkaTableSource(
-               TableSchema schema,
-               Optional<String> proctimeAttribute,
-               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-               Map<String, String> fieldMapping,
-               String topic, Properties properties,
-               DeserializationSchema<Row> deserializationSchema,
-               StartupMode startupMode,
-               Map<KafkaTopicPartition, Long> specificStartupOffsets);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
new file mode 100644
index 0000000..d8e8f7d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableDescriptor;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.utils.TestDeserializationSchema;
+import org.apache.flink.table.factories.utils.TestSerializationSchema;
+import org.apache.flink.table.factories.utils.TestTableFormat;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Abstract test base for {@link KafkaTableSourceSinkFactoryBase}.
+ */
+public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
+
+       private static final String TOPIC = "myTopic";
+       private static final int PARTITION_0 = 0;
+       private static final long OFFSET_0 = 100L;
+       private static final int PARTITION_1 = 1;
+       private static final long OFFSET_1 = 123L;
+       private static final String FRUIT_NAME = "fruit-name";
+       private static final String NAME = "name";
+       private static final String COUNT = "count";
+       private static final String TIME = "time";
+       private static final String EVENT_TIME = "event-time";
+       private static final String PROC_TIME = "proc-time";
+       private static final Properties KAFKA_PROPERTIES = new Properties();
+       static {
+               KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
+               KAFKA_PROPERTIES.setProperty("group.id", "dummy");
+               KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+       }
+
+       private static final Map<Integer, Long> OFFSETS = new HashMap<>();
+       static {
+               OFFSETS.put(PARTITION_0, OFFSET_0);
+               OFFSETS.put(PARTITION_1, OFFSET_1);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTableSource() {
+
+               // prepare parameters for Kafka table source
+
+               final TableSchema schema = TableSchema.builder()
+                       .field(FRUIT_NAME, Types.STRING())
+                       .field(COUNT, Types.DECIMAL())
+                       .field(EVENT_TIME, Types.SQL_TIMESTAMP())
+                       .field(PROC_TIME, Types.SQL_TIMESTAMP())
+                       .build();
+
+               final List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors = Collections.singletonList(
+                       new RowtimeAttributeDescriptor(EVENT_TIME, new 
ExistingField(TIME), new AscendingTimestamps()));
+
+               final Map<String, String> fieldMapping = new HashMap<>();
+               fieldMapping.put(FRUIT_NAME, NAME);
+               fieldMapping.put(COUNT, COUNT);
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+               final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
+                       TableSchema.builder()
+                               .field(NAME, Types.STRING())
+                               .field(COUNT, Types.DECIMAL())
+                               .field(TIME, Types.SQL_TIMESTAMP())
+                               .build()
+                               .toRowType()
+               );
+
+               final KafkaTableSource expected = getExpectedKafkaTableSource(
+                       schema,
+                       Optional.of(PROC_TIME),
+                       rowtimeAttributeDescriptors,
+                       fieldMapping,
+                       TOPIC,
+                       KAFKA_PROPERTIES,
+                       deserializationSchema,
+                       StartupMode.SPECIFIC_OFFSETS,
+                       specificOffsets);
+
+               // construct table source using descriptors and table source 
factory
+
+               final TestTableDescriptor testDesc = new TestTableDescriptor(
+                               new Kafka()
+                                       .version(getKafkaVersion())
+                                       .topic(TOPIC)
+                                       .properties(KAFKA_PROPERTIES)
+                                       .startFromSpecificOffsets(OFFSETS))
+                       .withFormat(new TestTableFormat())
+                       .withSchema(
+                               new Schema()
+                                       .field(FRUIT_NAME, 
Types.STRING()).from(NAME)
+                                       .field(COUNT, Types.DECIMAL()) // no 
from so it must match with the input
+                                       .field(EVENT_TIME, 
Types.SQL_TIMESTAMP()).rowtime(
+                                               new 
Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending())
+                                       .field(PROC_TIME, 
Types.SQL_TIMESTAMP()).proctime())
+                       .inAppendMode();
+
+               final Map<String, String> propertiesMap = 
DescriptorProperties.toJavaMap(testDesc);
+               final TableSource<?> actualSource = 
TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+                       .createStreamTableSource(propertiesMap);
+
+               assertEquals(expected, actualSource);
+
+               // test Kafka consumer
+               final KafkaTableSource actualKafkaSource = (KafkaTableSource) 
actualSource;
+               final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
+               actualKafkaSource.getDataStream(mock);
+               
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+       }
+
+       /**
+        * This test can be unified with the corresponding source test once we 
have fixed FLINK-9870.
+        */
+       @Test
+       public void testTableSink() {
+               // prepare parameters for Kafka table sink
+
+               final TableSchema schema = TableSchema.builder()
+                       .field(FRUIT_NAME, Types.STRING())
+                       .field(COUNT, Types.DECIMAL())
+                       .field(EVENT_TIME, Types.SQL_TIMESTAMP())
+                       .build();
+
+               final KafkaTableSink expected = getExpectedKafkaTableSink(
+                       schema,
+                       TOPIC,
+                       KAFKA_PROPERTIES,
+                       new FlinkFixedPartitioner<>(), // a custom partitioner 
is not support yet
+                       new TestSerializationSchema(schema.toRowType()));
+
+               // construct table sink using descriptors and table sink factory
+
+               final TestTableDescriptor testDesc = new TestTableDescriptor(
+                               new Kafka()
+                                       .version(getKafkaVersion())
+                                       .topic(TOPIC)
+                                       .properties(KAFKA_PROPERTIES)
+                                       .startFromSpecificOffsets(OFFSETS)) // 
test if they accepted although not needed
+                       .withFormat(new TestTableFormat())
+                       .withSchema(
+                               new Schema()
+                                       .field(FRUIT_NAME, Types.STRING())
+                                       .field(COUNT, Types.DECIMAL())
+                                       .field(EVENT_TIME, 
Types.SQL_TIMESTAMP()))
+                       .inAppendMode();
+
+               final Map<String, String> propertiesMap = 
DescriptorProperties.toJavaMap(testDesc);
+               final TableSink<?> actualSink = 
TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
+                       .createStreamTableSink(propertiesMap);
+
+               assertEquals(expected, actualSink);
+
+               // test Kafka producer
+               final KafkaTableSink actualKafkaSink = (KafkaTableSink) 
actualSink;
+               final DataStreamMock streamMock = new DataStreamMock(new 
StreamExecutionEnvironmentMock(), schema.toRowType());
+               actualKafkaSink.emitDataStream(streamMock);
+               
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
+       }
+
+       private static class StreamExecutionEnvironmentMock extends 
StreamExecutionEnvironment {
+
+               public SourceFunction<?> sourceFunction;
+
+               @Override
+               public <OUT> DataStreamSource<OUT> 
addSource(SourceFunction<OUT> sourceFunction) {
+                       this.sourceFunction = sourceFunction;
+                       return super.addSource(sourceFunction);
+               }
+
+               @Override
+               public JobExecutionResult execute(String jobName) {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       private static class DataStreamMock extends DataStream<Row> {
+
+               public SinkFunction<?> sinkFunction;
+
+               public DataStreamMock(StreamExecutionEnvironment environment, 
TypeInformation<Row> outType) {
+                       super(environment, new StreamTransformationMock("name", 
outType, 1));
+               }
+
+               @Override
+               public DataStreamSink<Row> addSink(SinkFunction<Row> 
sinkFunction) {
+                       this.sinkFunction = sinkFunction;
+                       return super.addSink(sinkFunction);
+               }
+       }
+
+       private static class StreamTransformationMock extends 
StreamTransformation<Row> {
+
+               public StreamTransformationMock(String name, 
TypeInformation<Row> outputType, int parallelism) {
+                       super(name, outputType, parallelism);
+               }
+
+               @Override
+               public void setChainingStrategy(ChainingStrategy strategy) {
+                       // do nothing
+               }
+
+               @Override
+               public Collection<StreamTransformation<?>> 
getTransitivePredecessors() {
+                       return null;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // For version-specific tests
+       // 
--------------------------------------------------------------------------------------------
+
+       protected abstract String getKafkaVersion();
+
+       protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getExpectedFlinkKafkaConsumer();
+
+       protected abstract Class<?> getExpectedFlinkKafkaProducer();
+
+       protected abstract KafkaTableSource getExpectedKafkaTableSource(
+               TableSchema schema,
+               Optional<String> proctimeAttribute,
+               List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
+               Map<String, String> fieldMapping,
+               String topic,
+               Properties properties,
+               DeserializationSchema<Row> deserializationSchema,
+               StartupMode startupMode,
+               Map<KafkaTopicPartition, Long> specificStartupOffsets);
+
+       protected abstract KafkaTableSink getExpectedKafkaTableSink(
+               TableSchema schema,
+               String topic,
+               Properties properties,
+               FlinkKafkaPartitioner<Row> partitioner,
+               SerializationSchema<Row> serializationSchema);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
index ab613a9..a7eaa48 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestSerializationSchema.scala
@@ -19,12 +19,26 @@
 package org.apache.flink.table.factories.utils
 
 import org.apache.flink.api.common.serialization.SerializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.types.Row
 
 /**
   * Serialization schema for testing purposes.
   */
-class TestSerializationSchema extends SerializationSchema[Row] {
+class TestSerializationSchema(val typeInfo: TypeInformation[Row]) extends 
SerializationSchema[Row] {
 
   override def serialize(element: Row): Array[Byte] = throw new 
UnsupportedOperationException()
+
+  def canEqual(other: Any): Boolean = 
other.isInstanceOf[TestSerializationSchema]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: TestSerializationSchema =>
+      (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    31 * typeInfo.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa25b4b3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
index 475cff9..39c268e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestTableFormatFactory.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.factories.utils
 
 import java.util
 
-import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.serialization.{DeserializationSchema, 
SerializationSchema}
 import org.apache.flink.table.descriptors.{DescriptorProperties, 
FormatDescriptorValidator, SchemaValidator}
-import org.apache.flink.table.factories.{DeserializationSchemaFactory, 
TableFormatFactoryServiceTest}
+import org.apache.flink.table.factories.{DeserializationSchemaFactory, 
SerializationSchemaFactory, TableFormatFactoryServiceTest}
 import org.apache.flink.types.Row
 
 /**
@@ -31,7 +31,9 @@ import org.apache.flink.types.Row
   * It has the same context as [[TestAmbiguousTableFormatFactory]] and both 
support COMMON_PATH.
   * This format does not support SPECIAL_PATH but supports schema derivation.
   */
-class TestTableFormatFactory extends DeserializationSchemaFactory[Row] {
+class TestTableFormatFactory
+  extends DeserializationSchemaFactory[Row]
+  with SerializationSchemaFactory[Row] {
 
   override def requiredContext(): util.Map[String, String] = {
     val context = new util.HashMap[String, String]()
@@ -62,4 +64,14 @@ class TestTableFormatFactory extends 
DeserializationSchemaFactory[Row] {
     val schema = SchemaValidator.deriveFormatFields(props)
     new TestDeserializationSchema(schema.toRowType)
   }
+
+  override def createSerializationSchema(
+      properties: util.Map[String, String])
+    : SerializationSchema[Row] = {
+
+    val props = new DescriptorProperties(true)
+    props.putProperties(properties)
+    val schema = SchemaValidator.deriveFormatFields(props)
+    new TestSerializationSchema(schema.toRowType)
+  }
 }

Reply via email to