This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c4e74b57c80fb5f2d22756535eeac60e428f1645
Author: yanghua <[email protected]>
AuthorDate: Sat Oct 13 11:48:51 2018 +0800

    [FLINK-9697] Rename KafkaTableSink to KafkaTableSinkBase
---
 .../connectors/kafka/Kafka010JsonTableSink.java          |  6 +++---
 .../streaming/connectors/kafka/Kafka010TableSink.java    |  2 +-
 .../connectors/kafka/Kafka010JsonTableSinkTest.java      |  4 ++--
 .../streaming/connectors/kafka/Kafka011TableSink.java    |  2 +-
 .../streaming/connectors/kafka/Kafka08JsonTableSink.java |  8 ++++----
 .../streaming/connectors/kafka/Kafka08TableSink.java     |  2 +-
 .../connectors/kafka/Kafka08JsonTableSinkTest.java       |  4 ++--
 .../streaming/connectors/kafka/Kafka09JsonTableSink.java |  8 ++++----
 .../streaming/connectors/kafka/Kafka09TableSink.java     |  2 +-
 .../connectors/kafka/Kafka09JsonTableSinkTest.java       |  4 ++--
 .../streaming/connectors/kafka/KafkaJsonTableSink.java   |  4 ++--
 .../{KafkaTableSink.java => KafkaTableSinkBase.java}     | 16 ++++++++--------
 ...SinkTestBase.java => KafkaTableSinkBaseTestBase.java} | 14 +++++++-------
 13 files changed, 38 insertions(+), 38 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index 8471908..b9a5350 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -28,7 +28,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 /**
- * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ * Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format.
  *
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
@@ -39,7 +39,7 @@ import java.util.Properties;
 public class Kafka010JsonTableSink extends KafkaJsonTableSink {
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.10
         * topic with fixed partition assignment.
         *
         * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
@@ -60,7 +60,7 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink 
{
        }
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.10
         * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
index 1d408b8..79aad7c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -31,7 +31,7 @@ import java.util.Properties;
  * Kafka 0.10 table sink for writing data into Kafka.
  */
 @Internal
-public class Kafka010TableSink extends KafkaTableSink {
+public class Kafka010TableSink extends KafkaTableSinkBase {
 
        public Kafka010TableSink(
                        TableSchema schema,
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index 9208f65..4575f87 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -32,10 +32,10 @@ import java.util.Properties;
  *             drop support for format-specific table sinks.
  */
 @Deprecated
-public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
+public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
 
        @Override
-       protected KafkaTableSink createTableSink(
+       protected KafkaTableSinkBase createTableSink(
                        String topic,
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index 8d81a5b..304b26b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -33,7 +33,7 @@ import java.util.Properties;
  * Kafka 0.11 table sink for writing data into Kafka.
  */
 @Internal
-public class Kafka011TableSink extends KafkaTableSink {
+public class Kafka011TableSink extends KafkaTableSinkBase {
 
        public Kafka011TableSink(
                        TableSchema schema,
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 189a9fd..b7474e2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -30,7 +30,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 /**
- * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ * Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format.
  *
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
@@ -41,7 +41,7 @@ import java.util.Properties;
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.8
         * topic with fixed partition assignment.
         *
         * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
@@ -62,7 +62,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.8
         * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
@@ -76,7 +76,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.8
         * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
index 146cfc9..90c4258 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -31,7 +31,7 @@ import java.util.Properties;
  * Kafka 0.8 table sink for writing data into Kafka.
  */
 @Internal
-public class Kafka08TableSink extends KafkaTableSink {
+public class Kafka08TableSink extends KafkaTableSinkBase {
 
        public Kafka08TableSink(
                        TableSchema schema,
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index fc46ad4..aa5fa16 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -32,10 +32,10 @@ import java.util.Properties;
  *             drop support for format-specific table sinks.
  */
 @Deprecated
-public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
 
        @Override
-       protected KafkaTableSink createTableSink(
+       protected KafkaTableSinkBase createTableSink(
                        String topic,
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index 3363459..cd27f83 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -30,7 +30,7 @@ import java.util.Optional;
 import java.util.Properties;
 
 /**
- * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ * Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format.
  *
  * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} 
descriptor together
  *             with descriptors for schema and format instead. Descriptors 
allow for
@@ -41,7 +41,7 @@ import java.util.Properties;
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.9
         * topic with fixed partition assignment.
         *
         * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
@@ -62,7 +62,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.9
         * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
@@ -76,7 +76,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * Creates {@link KafkaTableSinkBase} to write table rows as 
JSON-encoded records to a Kafka 0.9
         * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
index 6e38aad..657d75a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -31,7 +31,7 @@ import java.util.Properties;
  * Kafka 0.9 table sink for writing data into Kafka.
  */
 @Internal
-public class Kafka09TableSink extends KafkaTableSink {
+public class Kafka09TableSink extends KafkaTableSinkBase {
 
        public Kafka09TableSink(
                        TableSchema schema,
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 97b5c7d..29cfa93 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -32,10 +32,10 @@ import java.util.Properties;
  *             drop support for format-specific table sinks.
  */
 @Deprecated
-public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
 
        @Override
-       protected KafkaTableSink createTableSink(
+       protected KafkaTableSinkBase createTableSink(
                        String topic,
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 231eddd..d84eb89 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -28,13 +28,13 @@ import org.apache.flink.types.Row;
 import java.util.Properties;
 
 /**
- * Base class for {@link KafkaTableSink} that serializes data in JSON format.
+ * Base class for {@link KafkaTableSinkBase} that serializes data in JSON 
format.
  *
  * @deprecated Use table descriptors instead of implementation-specific 
classes.
  */
 @Deprecated
 @Internal
-public abstract class KafkaJsonTableSink extends KafkaTableSink {
+public abstract class KafkaJsonTableSink extends KafkaTableSinkBase {
 
        /**
         * Creates KafkaJsonTableSink.
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
similarity index 94%
rename from 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
rename to 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index a85d536..acd10cc 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -43,7 +43,7 @@ import java.util.Properties;
  * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, Optional)}}.
  */
 @Internal
-public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
+public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> 
{
 
        // TODO make all attributes final and mandatory once we drop support 
for format-specific table sinks
 
@@ -66,7 +66,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        protected String[] fieldNames;
        protected TypeInformation[] fieldTypes;
 
-       protected KafkaTableSink(
+       protected KafkaTableSinkBase(
                        TableSchema schema,
                        String topic,
                        Properties properties,
@@ -81,7 +81,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        }
 
        /**
-        * Creates KafkaTableSink.
+        * Creates KafkaTableSinkBase.
         *
         * @param topic                 Kafka topic to write to.
         * @param properties            Properties for the Kafka producer.
@@ -89,7 +89,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @deprecated Use table descriptors instead of implementation-specific 
classes.
         */
        @Deprecated
-       public KafkaTableSink(
+       public KafkaTableSinkBase(
                        String topic,
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
@@ -133,7 +133,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @return Deep copy of this sink
         */
        @Deprecated
-       protected KafkaTableSink createCopy() {
+       protected KafkaTableSinkBase createCopy() {
                throw new UnsupportedOperationException("This method only 
exists for backwards compatibility.");
        }
 
@@ -164,14 +164,14 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        }
 
        @Override
-       public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+       public KafkaTableSinkBase configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
                if (schema.isPresent()) {
                        // a fixed schema is defined so reconfiguration is not 
supported
                        throw new 
UnsupportedOperationException("Reconfiguration of this sink is not supported.");
                }
 
                // legacy code
-               KafkaTableSink copy = createCopy();
+               KafkaTableSinkBase copy = createCopy();
                copy.fieldNames = Preconditions.checkNotNull(fieldNames, 
"fieldNames");
                copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
                Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
@@ -191,7 +191,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
                if (o == null || getClass() != o.getClass()) {
                        return false;
                }
-               KafkaTableSink that = (KafkaTableSink) o;
+               KafkaTableSinkBase that = (KafkaTableSinkBase) o;
                return Objects.equals(schema, that.schema) &&
                        Objects.equals(topic, that.topic) &&
                        Objects.equals(properties, that.properties) &&
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java
similarity index 90%
rename from 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
rename to 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java
index b4bb89d..dbbb10f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java
@@ -50,7 +50,7 @@ import static org.mockito.Mockito.when;
  *             drop support for format-specific table sinks.
  */
 @Deprecated
-public abstract class KafkaTableSinkTestBase {
+public abstract class KafkaTableSinkBaseTestBase {
 
        private static final String TOPIC = "testTopic";
        private static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
@@ -64,7 +64,7 @@ public abstract class KafkaTableSinkTestBase {
                DataStream dataStream = mock(DataStream.class);
                
when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
 
-               KafkaTableSink kafkaTableSink = spy(createTableSink());
+               KafkaTableSinkBase kafkaTableSink = spy(createTableSink());
                kafkaTableSink.emitDataStream(dataStream);
 
                // verify correct producer class
@@ -80,8 +80,8 @@ public abstract class KafkaTableSinkTestBase {
 
        @Test
        public void testConfiguration() {
-               KafkaTableSink kafkaTableSink = createTableSink();
-               KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+               KafkaTableSinkBase kafkaTableSink = createTableSink();
+               KafkaTableSinkBase newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
                assertNotSame(kafkaTableSink, newKafkaTableSink);
 
                assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
@@ -89,7 +89,7 @@ public abstract class KafkaTableSinkTestBase {
                assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
 
-       protected abstract KafkaTableSink createTableSink(
+       protected abstract KafkaTableSinkBase createTableSink(
                String topic,
                Properties properties,
                FlinkKafkaPartitioner<Row> partitioner);
@@ -98,8 +98,8 @@ public abstract class KafkaTableSinkTestBase {
 
        protected abstract Class<? extends FlinkKafkaProducerBase> 
getProducerClass();
 
-       private KafkaTableSink createTableSink() {
-               KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, 
PARTITIONER);
+       private KafkaTableSinkBase createTableSink() {
+               KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, 
PARTITIONER);
                return sink.configure(FIELD_NAMES, FIELD_TYPES);
        }
 

Reply via email to