This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a727c0721d0bcf1d243af091a3b481e8dd3560f
Author: yanghua <[email protected]>
AuthorDate: Sat Oct 13 11:44:08 2018 +0800

    [FLINK-9697] Rename KafkaTableSource to KafkaTableSourceBase
---
 .../connectors/kafka/Kafka010TableSource.java      |  2 +-
 .../kafka/Kafka010TableSourceSinkFactory.java      |  4 +--
 .../kafka/Kafka010AvroTableSourceTest.java         |  2 +-
 .../kafka/Kafka010JsonTableSourceTest.java         |  2 +-
 .../kafka/Kafka010TableSourceSinkFactoryTest.java  |  4 +--
 .../connectors/kafka/Kafka011TableSource.java      |  2 +-
 .../kafka/Kafka011TableSourceSinkFactory.java      |  4 +--
 .../kafka/Kafka011AvroTableSourceTest.java         |  2 +-
 .../kafka/Kafka011JsonTableSourceTest.java         |  2 +-
 .../kafka/Kafka011TableSourceSinkFactoryTest.java  |  4 +--
 .../connectors/kafka/Kafka08TableSource.java       |  2 +-
 .../kafka/Kafka08TableSourceSinkFactory.java       |  4 +--
 .../kafka/Kafka08AvroTableSourceTest.java          |  2 +-
 .../kafka/Kafka08JsonTableSourceTest.java          |  2 +-
 .../kafka/Kafka08TableSourceSinkFactoryTest.java   |  4 +--
 .../connectors/kafka/Kafka09TableSource.java       |  2 +-
 .../kafka/Kafka09TableSourceSinkFactory.java       |  4 +--
 .../kafka/Kafka09AvroTableSourceTest.java          |  2 +-
 .../kafka/Kafka09JsonTableSourceTest.java          |  2 +-
 .../kafka/Kafka09TableSourceSinkFactoryTest.java   |  4 +--
 .../connectors/kafka/KafkaAvroTableSource.java     |  4 +--
 .../connectors/kafka/KafkaJsonTableSource.java     |  4 +--
 ...aTableSource.java => KafkaTableSourceBase.java} | 26 +++++++++---------
 .../kafka/KafkaTableSourceSinkFactoryBase.java     |  6 ++--
 .../kafka/KafkaAvroTableSourceTestBase.java        |  2 +-
 .../kafka/KafkaJsonTableSourceFactoryTestBase.java |  2 +-
 .../kafka/KafkaTableSourceBuilderTestBase.java     | 32 +++++++++++-----------
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 12 ++++----
 28 files changed, 72 insertions(+), 72 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index f657462..2016f84 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
 @Internal
-public class Kafka010TableSource extends KafkaTableSource {
+public class Kafka010TableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a Kafka 0.10 {@link StreamTableSource}.
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
index ecf12b2..de7d7bd 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -49,7 +49,7 @@ public class Kafka010TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryB
        }
 
        @Override
-       protected KafkaTableSource createKafkaTableSource(
+       protected KafkaTableSourceBase createKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -73,7 +73,7 @@ public class Kafka010TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryB
        }
 
        @Override
-       protected KafkaTableSink createKafkaTableSink(
+       protected KafkaTableSinkBase createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index bf253c4..22547af 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka010AvroTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 087f3ed..330e55a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka010JsonTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
index dac8a4d..1af001d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -56,7 +56,7 @@ public class Kafka010TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFact
        }
 
        @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
+       protected KafkaTableSourceBase getExpectedKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -81,7 +81,7 @@ public class Kafka010TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFact
        }
 
        @Override
-       protected KafkaTableSink getExpectedKafkaTableSink(
+       protected KafkaTableSinkBase getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index a646317..688a649 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * Kafka {@link StreamTableSource} for Kafka 0.11.
  */
 @Internal
-public class Kafka011TableSource extends KafkaTableSource {
+public class Kafka011TableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a Kafka 0.11 {@link StreamTableSource}.
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
index e6f677f..4585dc0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -49,7 +49,7 @@ public class Kafka011TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryB
        }
 
        @Override
-       protected KafkaTableSource createKafkaTableSource(
+       protected KafkaTableSourceBase createKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -73,7 +73,7 @@ public class Kafka011TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryB
        }
 
        @Override
-       protected KafkaTableSink createKafkaTableSink(
+       protected KafkaTableSinkBase createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
index aa083a2..f7bed64 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka011AvroTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
index 451795a..1a851c1 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka011JsonTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
index f461476..61ec9ef 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -56,7 +56,7 @@ public class Kafka011TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFact
        }
 
        @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
+       protected KafkaTableSourceBase getExpectedKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -81,7 +81,7 @@ public class Kafka011TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFact
        }
 
        @Override
-       protected KafkaTableSink getExpectedKafkaTableSink(
+       protected KafkaTableSinkBase getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 97c293e..b356abe 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * Kafka {@link StreamTableSource} for Kafka 0.8.
  */
 @Internal
-public class Kafka08TableSource extends KafkaTableSource {
+public class Kafka08TableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a Kafka 0.8 {@link StreamTableSource}.
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
index aeccd4f..c1d6579 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -49,7 +49,7 @@ public class Kafka08TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBa
        }
 
        @Override
-       protected KafkaTableSource createKafkaTableSource(
+       protected KafkaTableSourceBase createKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -73,7 +73,7 @@ public class Kafka08TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBa
        }
 
        @Override
-       protected KafkaTableSink createKafkaTableSink(
+       protected KafkaTableSinkBase createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
index c7b8e8c..bc7b9b6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka08AvroTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index 4ce79dd..dba8f7d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka08JsonTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
index ff633ec..ba9b686 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -56,7 +56,7 @@ public class Kafka08TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFacto
        }
 
        @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
+       protected KafkaTableSourceBase getExpectedKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -81,7 +81,7 @@ public class Kafka08TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFacto
        }
 
        @Override
-       protected KafkaTableSink getExpectedKafkaTableSink(
+       protected KafkaTableSinkBase getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 8f9e799..085aa67 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -36,7 +36,7 @@ import java.util.Properties;
  * Kafka {@link StreamTableSource} for Kafka 0.9.
  */
 @Internal
-public class Kafka09TableSource extends KafkaTableSource {
+public class Kafka09TableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a Kafka 0.9 {@link StreamTableSource}.
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
index 19f5150..11d56b4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -49,7 +49,7 @@ public class Kafka09TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBa
        }
 
        @Override
-       protected KafkaTableSource createKafkaTableSource(
+       protected KafkaTableSourceBase createKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -73,7 +73,7 @@ public class Kafka09TableSourceSinkFactory extends 
KafkaTableSourceSinkFactoryBa
        }
 
        @Override
-       protected KafkaTableSink createKafkaTableSink(
+       protected KafkaTableSinkBase createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
index 6f3a566..5f5e80d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka09AvroTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index bfff7d2..1a630b1 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.types.Row;
 public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource.Builder getBuilder() {
+       protected KafkaTableSourceBase.Builder getBuilder() {
                return Kafka09JsonTableSource.builder();
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
index d54c394..631a45f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -56,7 +56,7 @@ public class Kafka09TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFacto
        }
 
        @Override
-       protected KafkaTableSource getExpectedKafkaTableSource(
+       protected KafkaTableSourceBase getExpectedKafkaTableSource(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -81,7 +81,7 @@ public class Kafka09TableSourceSinkFactoryTest extends 
KafkaTableSourceSinkFacto
        }
 
        @Override
-       protected KafkaTableSink getExpectedKafkaTableSink(
+       protected KafkaTableSinkBase getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 86fd21d..1e9568f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -44,7 +44,7 @@ import java.util.Properties;
  */
 @Deprecated
 @Internal
-public abstract class KafkaAvroTableSource extends KafkaTableSource {
+public abstract class KafkaAvroTableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
@@ -93,7 +93,7 @@ public abstract class KafkaAvroTableSource extends 
KafkaTableSource {
         */
        @Deprecated
        protected abstract static class Builder<T extends KafkaAvroTableSource, 
B extends KafkaAvroTableSource.Builder>
-               extends KafkaTableSource.Builder<T, B> {
+               extends KafkaTableSourceBase.Builder<T, B> {
 
                private Class<? extends SpecificRecordBase> avroClass;
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 70b286b..a9db979 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -43,7 +43,7 @@ import java.util.Properties;
  */
 @Deprecated
 @Internal
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
+public abstract class KafkaJsonTableSource extends KafkaTableSourceBase {
 
        /**
         * Creates a generic Kafka JSON {@link StreamTableSource}.
@@ -102,7 +102,7 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource {
         */
        @Deprecated
        protected abstract static class Builder<T extends KafkaJsonTableSource, 
B extends KafkaJsonTableSource.Builder>
-               extends KafkaTableSource.Builder<T, B> {
+               extends KafkaTableSourceBase.Builder<T, B> {
 
                private TableSchema jsonSchema;
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
similarity index 97%
rename from 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
rename to 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
index 474c22f..04aa8c1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java
@@ -57,7 +57,7 @@ import scala.Option;
  * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 @Internal
-public abstract class KafkaTableSource implements
+public abstract class KafkaTableSourceBase implements
                StreamTableSource<Row>,
                DefinedProctimeAttribute,
                DefinedRowtimeAttributes,
@@ -110,7 +110,7 @@ public abstract class KafkaTableSource implements
         * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
         *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
         */
-       protected KafkaTableSource(
+       protected KafkaTableSourceBase(
                        TableSchema schema,
                        Optional<String> proctimeAttribute,
                        List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
@@ -141,7 +141,7 @@ public abstract class KafkaTableSource implements
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema for decoding 
records from Kafka.
         */
-       protected KafkaTableSource(
+       protected KafkaTableSourceBase(
                        TableSchema schema,
                        String topic,
                        Properties properties,
@@ -225,10 +225,10 @@ public abstract class KafkaTableSource implements
                }
                // TODO force classes to be equal once we drop support for 
format-specific table sources
                // if (o == null || getClass() != o.getClass()) {
-               if (o == null || !(o instanceof KafkaTableSource)) {
+               if (o == null || !(o instanceof KafkaTableSourceBase)) {
                        return false;
                }
-               final KafkaTableSource that = (KafkaTableSource) o;
+               final KafkaTableSourceBase that = (KafkaTableSourceBase) o;
                return Objects.equals(schema, that.schema) &&
                        Objects.equals(proctimeAttribute, 
that.proctimeAttribute) &&
                        Objects.equals(rowtimeAttributeDescriptors, 
that.rowtimeAttributeDescriptors) &&
@@ -398,18 +398,18 @@ public abstract class KafkaTableSource implements
                        DeserializationSchema<Row> deserializationSchema);
 
        /**
-        * Abstract builder for a {@link KafkaTableSource} to be extended by 
builders of subclasses of
-        * KafkaTableSource.
+        * Abstract builder for a {@link KafkaTableSourceBase} to be extended 
by builders of subclasses of
+        * KafkaTableSourceBase.
         *
-        * @param <T> Type of the KafkaTableSource produced by the builder.
-        * @param <B> Type of the KafkaTableSource.Builder subclass.
+        * @param <T> Type of the KafkaTableSourceBase produced by the builder.
+        * @param <B> Type of the KafkaTableSourceBase.Builder subclass.
         * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
         *             with descriptors for schema and format instead. 
Descriptors allow for
         *             implementation-agnostic definition of tables. See also
         *             {@link 
org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
         */
        @Deprecated
-       protected abstract static class Builder<T extends KafkaTableSource, B 
extends KafkaTableSource.Builder> {
+       protected abstract static class Builder<T extends KafkaTableSourceBase, 
B extends KafkaTableSourceBase.Builder> {
 
                private String topic;
 
@@ -682,11 +682,11 @@ public abstract class KafkaTableSource implements
                protected abstract B builder();
 
                /**
-                * Builds the configured {@link KafkaTableSource}.
-                * @return The configured {@link KafkaTableSource}.
+                * Builds the configured {@link KafkaTableSourceBase}.
+                * @return The configured {@link KafkaTableSourceBase}.
                 * @deprecated Use table descriptors instead of 
implementation-specific builders.
                 */
                @Deprecated
-               protected abstract KafkaTableSource build();
+               protected abstract KafkaTableSourceBase build();
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 5634331..431f946 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -83,7 +83,7 @@ import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.
 import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
 
 /**
- * Factory for creating configured instances of {@link KafkaTableSource}.
+ * Factory for creating configured instances of {@link KafkaTableSourceBase}.
  */
 public abstract class KafkaTableSourceSinkFactoryBase implements
                StreamTableSourceFactory<Row>,
@@ -213,7 +213,7 @@ public abstract class KafkaTableSourceSinkFactoryBase 
implements
         * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
         *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
         */
-       protected abstract KafkaTableSource createKafkaTableSource(
+       protected abstract KafkaTableSourceBase createKafkaTableSource(
                TableSchema schema,
                Optional<String> proctimeAttribute,
                List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
@@ -232,7 +232,7 @@ public abstract class KafkaTableSourceSinkFactoryBase 
implements
         * @param properties  Properties for the Kafka consumer.
         * @param partitioner Partitioner to select Kafka partition for each 
item.
         */
-       protected abstract KafkaTableSink createKafkaTableSink(
+       protected abstract KafkaTableSinkBase createKafkaTableSink(
                TableSchema schema,
                String topic,
                Properties properties,
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index 140ce21..089a118 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertNull;
 public abstract class KafkaAvroTableSourceTestBase extends 
KafkaTableSourceBuilderTestBase {
 
        @Override
-       protected void configureBuilder(KafkaTableSource.Builder builder) {
+       protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
                super.configureBuilder(builder);
                ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SchemaRecord.class);
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 51c0e7b..afb9517 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -114,7 +114,7 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
 
-               final KafkaTableSource builderSource = builder()
+               final KafkaTableSourceBase builderSource = builder()
                                
.forJsonSchema(TableSchema.fromTypeInfo(JsonRowSchemaConverter.convert(JSON_SCHEMA)))
                                .failOnMissingField(true)
                                .withTableToJsonMapping(tableJsonMapping)
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
index b9ae47b..16dc714 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java
@@ -70,11 +70,11 @@ public abstract class KafkaTableSourceBuilderTestBase {
        @Test
        @SuppressWarnings("unchecked")
        public void testKafkaConsumer() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
 
                // assert that correct
-               KafkaTableSource observed = spy(b.build());
+               KafkaTableSourceBase observed = spy(b.build());
                StreamExecutionEnvironment env = 
mock(StreamExecutionEnvironment.class);
                
when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class));
                observed.getDataStream(env);
@@ -89,10 +89,10 @@ public abstract class KafkaTableSourceBuilderTestBase {
 
        @Test
        public void testTableSchema() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
 
-               KafkaTableSource source = b.build();
+               KafkaTableSourceBase source = b.build();
 
                // check table schema
                TableSchema schema = source.getTableSchema();
@@ -113,10 +113,10 @@ public abstract class KafkaTableSourceBuilderTestBase {
 
        @Test
        public void testNoTimeAttributes() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
 
-               KafkaTableSource source = b.build();
+               KafkaTableSourceBase source = b.build();
 
                // assert no proctime
                assertNull(source.getProctimeAttribute());
@@ -127,11 +127,11 @@ public abstract class KafkaTableSourceBuilderTestBase {
 
        @Test
        public void testProctimeAttribute() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
                b.withProctimeAttribute("time1");
 
-               KafkaTableSource source = b.build();
+               KafkaTableSourceBase source = b.build();
 
                // assert correct proctime field
                assertEquals(source.getProctimeAttribute(), "time1");
@@ -143,11 +143,11 @@ public abstract class KafkaTableSourceBuilderTestBase {
 
        @Test
        public void testRowtimeAttribute() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
                b.withRowtimeAttribute("time2", new ExistingField("time2"), new 
AscendingTimestamps());
 
-               KafkaTableSource source = b.build();
+               KafkaTableSourceBase source = b.build();
 
                // assert no proctime
                assertNull(source.getProctimeAttribute());
@@ -168,13 +168,13 @@ public abstract class KafkaTableSourceBuilderTestBase {
 
        @Test
        public void testRowtimeAttribute2() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
 
                try {
                        b.withKafkaTimestampAsRowtimeAttribute("time2", new 
AscendingTimestamps());
 
-                       KafkaTableSource source = b.build();
+                       KafkaTableSourceBase source = b.build();
 
                        // assert no proctime
                        assertNull(source.getProctimeAttribute());
@@ -201,11 +201,11 @@ public abstract class KafkaTableSourceBuilderTestBase {
        @Test
        @SuppressWarnings("unchecked")
        public void testConsumerOffsets() {
-               KafkaTableSource.Builder b = getBuilder();
+               KafkaTableSourceBase.Builder b = getBuilder();
                configureBuilder(b);
 
                // test the default behavior
-               KafkaTableSource source = spy(b.build());
+               KafkaTableSourceBase source = spy(b.build());
                when(source.createKafkaConsumer(TOPIC, PROPS, null))
                                .thenReturn(mock(getFlinkKafkaConsumer()));
 
@@ -241,13 +241,13 @@ public abstract class KafkaTableSourceBuilderTestBase {
                verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromSpecificOffsets(any(Map.class));
        }
 
-       protected abstract KafkaTableSource.Builder getBuilder();
+       protected abstract KafkaTableSourceBase.Builder getBuilder();
 
        protected abstract Class<DeserializationSchema<Row>> 
getDeserializationSchema();
 
        protected abstract Class<FlinkKafkaConsumerBase<Row>> 
getFlinkKafkaConsumer();
 
-       protected void configureBuilder(KafkaTableSource.Builder builder) {
+       protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
                builder
                        .forTopic(TOPIC)
                        .withKafkaProperties(PROPS)
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 5e9144c..135e718 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -133,7 +133,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                                .toRowType()
                );
 
-               final KafkaTableSource expected = getExpectedKafkaTableSource(
+               final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
                        schema,
                        Optional.of(PROC_TIME),
                        rowtimeAttributeDescriptors,
@@ -172,7 +172,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                assertEquals(expected, actualSource);
 
                // test Kafka consumer
-               final KafkaTableSource actualKafkaSource = (KafkaTableSource) 
actualSource;
+               final KafkaTableSourceBase actualKafkaSource = 
(KafkaTableSourceBase) actualSource;
                final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
                actualKafkaSource.getDataStream(mock);
                
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
@@ -191,7 +191,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        .field(EVENT_TIME, Types.SQL_TIMESTAMP())
                        .build();
 
-               final KafkaTableSink expected = getExpectedKafkaTableSink(
+               final KafkaTableSinkBase expected = getExpectedKafkaTableSink(
                        schema,
                        TOPIC,
                        KAFKA_PROPERTIES,
@@ -222,7 +222,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                assertEquals(expected, actualSink);
 
                // test Kafka producer
-               final KafkaTableSink actualKafkaSink = (KafkaTableSink) 
actualSink;
+               final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) 
actualSink;
                final DataStreamMock streamMock = new DataStreamMock(new 
StreamExecutionEnvironmentMock(), schema.toRowType());
                actualKafkaSink.emitDataStream(streamMock);
                
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
@@ -286,7 +286,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
 
        protected abstract Class<?> getExpectedFlinkKafkaProducer();
 
-       protected abstract KafkaTableSource getExpectedKafkaTableSource(
+       protected abstract KafkaTableSourceBase getExpectedKafkaTableSource(
                TableSchema schema,
                Optional<String> proctimeAttribute,
                List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
@@ -297,7 +297,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                StartupMode startupMode,
                Map<KafkaTopicPartition, Long> specificStartupOffsets);
 
-       protected abstract KafkaTableSink getExpectedKafkaTableSink(
+       protected abstract KafkaTableSinkBase getExpectedKafkaTableSink(
                TableSchema schema,
                String topic,
                Properties properties,

Reply via email to