This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3a727c0721d0bcf1d243af091a3b481e8dd3560f Author: yanghua <[email protected]> AuthorDate: Sat Oct 13 11:44:08 2018 +0800 [FLINK-9697] Rename KafkaTableSource to KafkaTableSourceBase --- .../connectors/kafka/Kafka010TableSource.java | 2 +- .../kafka/Kafka010TableSourceSinkFactory.java | 4 +-- .../kafka/Kafka010AvroTableSourceTest.java | 2 +- .../kafka/Kafka010JsonTableSourceTest.java | 2 +- .../kafka/Kafka010TableSourceSinkFactoryTest.java | 4 +-- .../connectors/kafka/Kafka011TableSource.java | 2 +- .../kafka/Kafka011TableSourceSinkFactory.java | 4 +-- .../kafka/Kafka011AvroTableSourceTest.java | 2 +- .../kafka/Kafka011JsonTableSourceTest.java | 2 +- .../kafka/Kafka011TableSourceSinkFactoryTest.java | 4 +-- .../connectors/kafka/Kafka08TableSource.java | 2 +- .../kafka/Kafka08TableSourceSinkFactory.java | 4 +-- .../kafka/Kafka08AvroTableSourceTest.java | 2 +- .../kafka/Kafka08JsonTableSourceTest.java | 2 +- .../kafka/Kafka08TableSourceSinkFactoryTest.java | 4 +-- .../connectors/kafka/Kafka09TableSource.java | 2 +- .../kafka/Kafka09TableSourceSinkFactory.java | 4 +-- .../kafka/Kafka09AvroTableSourceTest.java | 2 +- .../kafka/Kafka09JsonTableSourceTest.java | 2 +- .../kafka/Kafka09TableSourceSinkFactoryTest.java | 4 +-- .../connectors/kafka/KafkaAvroTableSource.java | 4 +-- .../connectors/kafka/KafkaJsonTableSource.java | 4 +-- ...aTableSource.java => KafkaTableSourceBase.java} | 26 +++++++++--------- .../kafka/KafkaTableSourceSinkFactoryBase.java | 6 ++-- .../kafka/KafkaAvroTableSourceTestBase.java | 2 +- .../kafka/KafkaJsonTableSourceFactoryTestBase.java | 2 +- .../kafka/KafkaTableSourceBuilderTestBase.java | 32 +++++++++++----------- .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 12 ++++---- 28 files changed, 72 insertions(+), 72 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java index f657462..2016f84 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * Kafka {@link StreamTableSource} for Kafka 0.10. */ @Internal -public class Kafka010TableSource extends KafkaTableSource { +public class Kafka010TableSource extends KafkaTableSourceBase { /** * Creates a Kafka 0.10 {@link StreamTableSource}. diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java index ecf12b2..de7d7bd 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java @@ -49,7 +49,7 @@ public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB } @Override - protected KafkaTableSource createKafkaTableSource( + protected KafkaTableSourceBase createKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -73,7 +73,7 @@ public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB } @Override - protected KafkaTableSink createKafkaTableSink( + protected KafkaTableSinkBase createKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java index bf253c4..22547af 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka010AvroTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java index 087f3ed..330e55a 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka010JsonTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java index dac8a4d..1af001d 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java @@ -56,7 +56,7 @@ public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact } @Override - protected KafkaTableSource getExpectedKafkaTableSource( + protected KafkaTableSourceBase getExpectedKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -81,7 +81,7 @@ public class Kafka010TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact } @Override - protected KafkaTableSink getExpectedKafkaTableSink( + protected KafkaTableSinkBase getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java index a646317..688a649 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * Kafka {@link StreamTableSource} for Kafka 0.11. */ @Internal -public class Kafka011TableSource extends KafkaTableSource { +public class Kafka011TableSource extends KafkaTableSourceBase { /** * Creates a Kafka 0.11 {@link StreamTableSource}. diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java index e6f677f..4585dc0 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java @@ -49,7 +49,7 @@ public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB } @Override - protected KafkaTableSource createKafkaTableSource( + protected KafkaTableSourceBase createKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -73,7 +73,7 @@ public class Kafka011TableSourceSinkFactory extends KafkaTableSourceSinkFactoryB } @Override - protected KafkaTableSink createKafkaTableSink( + protected KafkaTableSinkBase createKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java index aa083a2..f7bed64 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka011AvroTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java index 451795a..1a851c1 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka011JsonTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java index f461476..61ec9ef 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java @@ -56,7 +56,7 @@ public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact } @Override - protected KafkaTableSource getExpectedKafkaTableSource( + protected KafkaTableSourceBase getExpectedKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -81,7 +81,7 @@ public class Kafka011TableSourceSinkFactoryTest extends KafkaTableSourceSinkFact } @Override - protected KafkaTableSink getExpectedKafkaTableSink( + protected KafkaTableSinkBase getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java index 97c293e..b356abe 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * Kafka {@link StreamTableSource} for Kafka 0.8. */ @Internal -public class Kafka08TableSource extends KafkaTableSource { +public class Kafka08TableSource extends KafkaTableSourceBase { /** * Creates a Kafka 0.8 {@link StreamTableSource}. diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java index aeccd4f..c1d6579 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java @@ -49,7 +49,7 @@ public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa } @Override - protected KafkaTableSource createKafkaTableSource( + protected KafkaTableSourceBase createKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -73,7 +73,7 @@ public class Kafka08TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa } @Override - protected KafkaTableSink createKafkaTableSink( + protected KafkaTableSinkBase createKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java index c7b8e8c..bc7b9b6 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka08AvroTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java index 4ce79dd..dba8f7d 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka08JsonTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java index ff633ec..ba9b686 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java @@ -56,7 +56,7 @@ public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto } @Override - protected KafkaTableSource getExpectedKafkaTableSource( + protected KafkaTableSourceBase getExpectedKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -81,7 +81,7 @@ public class Kafka08TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto } @Override - protected KafkaTableSink getExpectedKafkaTableSink( + protected KafkaTableSinkBase getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java index 8f9e799..085aa67 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * Kafka {@link StreamTableSource} for Kafka 0.9. */ @Internal -public class Kafka09TableSource extends KafkaTableSource { +public class Kafka09TableSource extends KafkaTableSourceBase { /** * Creates a Kafka 0.9 {@link StreamTableSource}. diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java index 19f5150..11d56b4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java @@ -49,7 +49,7 @@ public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa } @Override - protected KafkaTableSource createKafkaTableSource( + protected KafkaTableSourceBase createKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -73,7 +73,7 @@ public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBa } @Override - protected KafkaTableSink createKafkaTableSink( + protected KafkaTableSinkBase createKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java index 6f3a566..5f5e80d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka09AvroTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java index bfff7d2..1a630b1 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -32,7 +32,7 @@ import org.apache.flink.types.Row; public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase { @Override - protected KafkaTableSource.Builder getBuilder() { + protected KafkaTableSourceBase.Builder getBuilder() { return Kafka09JsonTableSource.builder(); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java index d54c394..631a45f 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java @@ -56,7 +56,7 @@ public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto } @Override - protected KafkaTableSource getExpectedKafkaTableSource( + protected KafkaTableSourceBase getExpectedKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -81,7 +81,7 @@ public class Kafka09TableSourceSinkFactoryTest extends KafkaTableSourceSinkFacto } @Override - protected KafkaTableSink getExpectedKafkaTableSink( + protected KafkaTableSinkBase getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java index 86fd21d..1e9568f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java @@ -44,7 +44,7 @@ import java.util.Properties; */ @Deprecated @Internal -public abstract class KafkaAvroTableSource extends KafkaTableSource { +public abstract class KafkaAvroTableSource extends KafkaTableSourceBase { /** * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. @@ -93,7 +93,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource { */ @Deprecated protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder> - extends KafkaTableSource.Builder<T, B> { + extends KafkaTableSourceBase.Builder<T, B> { private Class<? extends SpecificRecordBase> avroClass; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index 70b286b..a9db979 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -43,7 +43,7 @@ import java.util.Properties; */ @Deprecated @Internal -public abstract class KafkaJsonTableSource extends KafkaTableSource { +public abstract class KafkaJsonTableSource extends KafkaTableSourceBase { /** * Creates a generic Kafka JSON {@link StreamTableSource}. @@ -102,7 +102,7 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { */ @Deprecated protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder> - extends KafkaTableSource.Builder<T, B> { + extends KafkaTableSourceBase.Builder<T, B> { private TableSchema jsonSchema; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java similarity index 97% rename from flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java rename to flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java index 474c22f..04aa8c1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBase.java @@ -57,7 +57,7 @@ import scala.Option; * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. */ @Internal -public abstract class KafkaTableSource implements +public abstract class KafkaTableSourceBase implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes, @@ -110,7 +110,7 @@ public abstract class KafkaTableSource implements * @param specificStartupOffsets Specific startup offsets; only relevant when startup * mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ - protected KafkaTableSource( + protected KafkaTableSourceBase( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -141,7 +141,7 @@ public abstract class KafkaTableSource implements * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema for decoding records from Kafka. */ - protected KafkaTableSource( + protected KafkaTableSourceBase( TableSchema schema, String topic, Properties properties, @@ -225,10 +225,10 @@ public abstract class KafkaTableSource implements } // TODO force classes to be equal once we drop support for format-specific table sources // if (o == null || getClass() != o.getClass()) { - if (o == null || !(o instanceof KafkaTableSource)) { + if (o == null || !(o instanceof KafkaTableSourceBase)) { return false; } - final KafkaTableSource that = (KafkaTableSource) o; + final KafkaTableSourceBase that = (KafkaTableSourceBase) o; return Objects.equals(schema, that.schema) && Objects.equals(proctimeAttribute, that.proctimeAttribute) && Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) && @@ -398,18 +398,18 @@ public abstract class KafkaTableSource implements DeserializationSchema<Row> deserializationSchema); /** - * Abstract builder for a {@link KafkaTableSource} to be extended by builders of subclasses of - * KafkaTableSource. + * Abstract builder for a {@link KafkaTableSourceBase} to be extended by builders of subclasses of + * KafkaTableSourceBase. * - * @param <T> Type of the KafkaTableSource produced by the builder. - * @param <B> Type of the KafkaTableSource.Builder subclass. + * @param <T> Type of the KafkaTableSourceBase produced by the builder. + * @param <B> Type of the KafkaTableSourceBase.Builder subclass. * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated - protected abstract static class Builder<T extends KafkaTableSource, B extends KafkaTableSource.Builder> { + protected abstract static class Builder<T extends KafkaTableSourceBase, B extends KafkaTableSourceBase.Builder> { private String topic; @@ -682,11 +682,11 @@ public abstract class KafkaTableSource implements protected abstract B builder(); /** - * Builds the configured {@link KafkaTableSource}. - * @return The configured {@link KafkaTableSource}. + * Builds the configured {@link KafkaTableSourceBase}. + * @return The configured {@link KafkaTableSourceBase}. * @deprecated Use table descriptors instead of implementation-specific builders. */ @Deprecated - protected abstract KafkaTableSource build(); + protected abstract KafkaTableSourceBase build(); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java index 5634331..431f946 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -83,7 +83,7 @@ import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator. import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; /** - * Factory for creating configured instances of {@link KafkaTableSource}. + * Factory for creating configured instances of {@link KafkaTableSourceBase}. */ public abstract class KafkaTableSourceSinkFactoryBase implements StreamTableSourceFactory<Row>, @@ -213,7 +213,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements * @param specificStartupOffsets Specific startup offsets; only relevant when startup * mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ - protected abstract KafkaTableSource createKafkaTableSource( + protected abstract KafkaTableSourceBase createKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -232,7 +232,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements * @param properties Properties for the Kafka consumer. * @param partitioner Partitioner to select Kafka partition for each item. */ - protected abstract KafkaTableSink createKafkaTableSink( + protected abstract KafkaTableSinkBase createKafkaTableSink( TableSchema schema, String topic, Properties properties, diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java index 140ce21..089a118 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java @@ -43,7 +43,7 @@ import static org.junit.Assert.assertNull; public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceBuilderTestBase { @Override - protected void configureBuilder(KafkaTableSource.Builder builder) { + protected void configureBuilder(KafkaTableSourceBase.Builder builder) { super.configureBuilder(builder); ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java index 51c0e7b..afb9517 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java @@ -114,7 +114,7 @@ public abstract class KafkaJsonTableSourceFactoryTestBase { specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L); specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L); - final KafkaTableSource builderSource = builder() + final KafkaTableSourceBase builderSource = builder() .forJsonSchema(TableSchema.fromTypeInfo(JsonRowSchemaConverter.convert(JSON_SCHEMA))) .failOnMissingField(true) .withTableToJsonMapping(tableJsonMapping) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java index b9ae47b..16dc714 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceBuilderTestBase.java @@ -70,11 +70,11 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test @SuppressWarnings("unchecked") public void testKafkaConsumer() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); // assert that correct - KafkaTableSource observed = spy(b.build()); + KafkaTableSourceBase observed = spy(b.build()); StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class)); observed.getDataStream(env); @@ -89,10 +89,10 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test public void testTableSchema() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); - KafkaTableSource source = b.build(); + KafkaTableSourceBase source = b.build(); // check table schema TableSchema schema = source.getTableSchema(); @@ -113,10 +113,10 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test public void testNoTimeAttributes() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); - KafkaTableSource source = b.build(); + KafkaTableSourceBase source = b.build(); // assert no proctime assertNull(source.getProctimeAttribute()); @@ -127,11 +127,11 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test public void testProctimeAttribute() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); b.withProctimeAttribute("time1"); - KafkaTableSource source = b.build(); + KafkaTableSourceBase source = b.build(); // assert correct proctime field assertEquals(source.getProctimeAttribute(), "time1"); @@ -143,11 +143,11 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test public void testRowtimeAttribute() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps()); - KafkaTableSource source = b.build(); + KafkaTableSourceBase source = b.build(); // assert no proctime assertNull(source.getProctimeAttribute()); @@ -168,13 +168,13 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test public void testRowtimeAttribute2() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); try { b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps()); - KafkaTableSource source = b.build(); + KafkaTableSourceBase source = b.build(); // assert no proctime assertNull(source.getProctimeAttribute()); @@ -201,11 +201,11 @@ public abstract class KafkaTableSourceBuilderTestBase { @Test @SuppressWarnings("unchecked") public void testConsumerOffsets() { - KafkaTableSource.Builder b = getBuilder(); + KafkaTableSourceBase.Builder b = getBuilder(); configureBuilder(b); // test the default behavior - KafkaTableSource source = spy(b.build()); + KafkaTableSourceBase source = spy(b.build()); when(source.createKafkaConsumer(TOPIC, PROPS, null)) .thenReturn(mock(getFlinkKafkaConsumer())); @@ -241,13 +241,13 @@ public abstract class KafkaTableSourceBuilderTestBase { verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class)); } - protected abstract KafkaTableSource.Builder getBuilder(); + protected abstract KafkaTableSourceBase.Builder getBuilder(); protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema(); protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer(); - protected void configureBuilder(KafkaTableSource.Builder builder) { + protected void configureBuilder(KafkaTableSourceBase.Builder builder) { builder .forTopic(TOPIC) .withKafkaProperties(PROPS) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index 5e9144c..135e718 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -133,7 +133,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { .toRowType() ); - final KafkaTableSource expected = getExpectedKafkaTableSource( + final KafkaTableSourceBase expected = getExpectedKafkaTableSource( schema, Optional.of(PROC_TIME), rowtimeAttributeDescriptors, @@ -172,7 +172,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { assertEquals(expected, actualSource); // test Kafka consumer - final KafkaTableSource actualKafkaSource = (KafkaTableSource) actualSource; + final KafkaTableSourceBase actualKafkaSource = (KafkaTableSourceBase) actualSource; final StreamExecutionEnvironmentMock mock = new StreamExecutionEnvironmentMock(); actualKafkaSource.getDataStream(mock); assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass())); @@ -191,7 +191,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { .field(EVENT_TIME, Types.SQL_TIMESTAMP()) .build(); - final KafkaTableSink expected = getExpectedKafkaTableSink( + final KafkaTableSinkBase expected = getExpectedKafkaTableSink( schema, TOPIC, KAFKA_PROPERTIES, @@ -222,7 +222,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { assertEquals(expected, actualSink); // test Kafka producer - final KafkaTableSink actualKafkaSink = (KafkaTableSink) actualSink; + final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink; final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); actualKafkaSink.emitDataStream(streamMock); assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); @@ -286,7 +286,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { protected abstract Class<?> getExpectedFlinkKafkaProducer(); - protected abstract KafkaTableSource getExpectedKafkaTableSource( + protected abstract KafkaTableSourceBase getExpectedKafkaTableSource( TableSchema schema, Optional<String> proctimeAttribute, List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, @@ -297,7 +297,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets); - protected abstract KafkaTableSink getExpectedKafkaTableSink( + protected abstract KafkaTableSinkBase getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties,
