[FLINK-9852] [table] Expose descriptor-based sink creation and introduce update mode
This commit exposes the new unified sink creation through the table environments and the external catalog table. It introduce a new update-mode property in order to distinguish between append, retract, and upsert table sources and sinks. This commit refactors the top-level API classes a last time and adds more documentation. This commit completes the unified table sources/sinks story from an API point of view. Brief change log: - Introduction of TableEnvironment.connect() and corresponding API builder classes - Introduction of property update-mode: and update of existing connectors - External catalog support with proper source/sink discovery and API This closes #6343. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bb07e4e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bb07e4e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bb07e4e Branch: refs/heads/release-1.6 Commit: 7bb07e4e74ed7167cd98599af1a851afa1d77252 Parents: 4862101 Author: Timo Walther <[email protected]> Authored: Fri Jul 20 08:47:53 2018 +0200 Committer: Timo Walther <[email protected]> Committed: Fri Jul 20 09:33:48 2018 +0200 ---------------------------------------------------------------------- .../kafka/Kafka010AvroTableSource.java | 6 +- .../kafka/Kafka010JsonTableSource.java | 6 +- .../kafka/Kafka011AvroTableSource.java | 6 +- .../kafka/Kafka011JsonTableSource.java | 6 +- .../kafka/Kafka08AvroTableSource.java | 6 +- .../kafka/Kafka08JsonTableSource.java | 6 +- .../kafka/Kafka09AvroTableSource.java | 6 +- .../kafka/Kafka09JsonTableSource.java | 6 +- .../connectors/kafka/KafkaAvroTableSource.java | 6 +- .../connectors/kafka/KafkaJsonTableSource.java | 4 +- .../connectors/kafka/KafkaTableSource.java | 4 +- .../kafka/KafkaTableSourceFactory.java | 3 + .../KafkaJsonTableSourceFactoryTestBase.java | 11 +- .../kafka/KafkaTableSourceFactoryTestBase.java | 11 +- .../apache/flink/table/client/config/Sink.java | 4 +- .../flink/table/client/config/Source.java | 4 +- .../flink/table/client/config/SourceSink.java | 2 +- .../client/gateway/local/EnvironmentTest.java | 14 +- .../gateway/local/ExecutionContextTest.java | 6 +- .../gateway/local/LocalExecutorITCase.java | 12 +- .../gateway/utils/TestTableSinkFactory.java | 3 + .../gateway/utils/TestTableSourceFactory.java | 3 + .../resources/test-sql-client-defaults.yaml | 3 + .../test/resources/test-sql-client-factory.yaml | 1 + ...rg.apache.flink.table.factories.TableFactory | 6 +- .../flink/table/api/BatchTableEnvironment.scala | 36 +- .../table/api/StreamTableEnvironment.scala | 26 +- .../flink/table/api/TableEnvironment.scala | 32 +- .../table/catalog/ExternalCatalogSchema.scala | 2 +- .../table/catalog/ExternalCatalogTable.scala | 335 +++++++++++++++++-- .../table/catalog/ExternalTableSourceUtil.scala | 70 ---- .../flink/table/catalog/ExternalTableUtil.scala | 102 ++++++ .../descriptors/BatchTableDescriptor.scala | 31 ++ .../BatchTableSourceDescriptor.scala | 87 ----- .../descriptors/ConnectTableDescriptor.scala | 108 ++++++ .../flink/table/descriptors/Descriptor.scala | 9 +- .../descriptors/DescriptorProperties.scala | 10 + .../descriptors/RegistrableDescriptor.scala | 49 +++ .../table/descriptors/SchematicDescriptor.scala | 35 ++ .../descriptors/StreamTableDescriptor.scala | 101 ++++++ .../StreamTableDescriptorValidator.scala | 48 +++ .../StreamTableSourceDescriptor.scala | 90 ----- .../descriptors/StreamableDescriptor.scala | 67 ++++ .../table/descriptors/TableDescriptor.scala | 20 +- .../descriptors/TableDescriptorValidator.scala | 29 -- .../table/descriptors/TableSinkDescriptor.scala | 32 -- .../descriptors/TableSourceDescriptor.scala | 57 ---- .../flink/table/factories/TableFactory.scala | 6 + .../table/factories/TableFactoryService.scala | 8 +- .../table/factories/TableFactoryUtil.scala | 82 +++++ .../table/factories/TableFormatFactory.scala | 2 + .../table/sinks/CsvAppendTableSinkFactory.scala | 45 +++ .../table/sinks/CsvBatchTableSinkFactory.scala | 38 +++ .../flink/table/sinks/CsvTableSinkFactory.scala | 112 ------- .../table/sinks/CsvTableSinkFactoryBase.scala | 96 ++++++ .../sources/CsvAppendTableSourceFactory.scala | 45 +++ .../sources/CsvBatchTableSourceFactory.scala | 38 +++ .../table/sources/CsvTableSourceFactory.scala | 139 -------- .../sources/CsvTableSourceFactoryBase.scala | 123 +++++++ .../flink/table/api/ExternalCatalogTest.scala | 20 +- .../catalog/ExternalCatalogSchemaTest.scala | 2 +- .../catalog/InMemoryExternalCatalogTest.scala | 8 +- .../table/descriptors/DescriptorTestBase.scala | 37 +- .../table/descriptors/TableDescriptorTest.scala | 110 ++++++ .../descriptors/TableSourceDescriptorTest.scala | 114 ------- .../table/runtime/utils/CommonTestData.scala | 32 +- .../table/utils/MockTableEnvironment.scala | 4 +- 67 files changed, 1669 insertions(+), 913 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java index ebbadcf..d9d0a91 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java @@ -39,7 +39,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka010AvroTableSource extends KafkaAvroTableSource { @@ -120,7 +120,7 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Builder builder() { @@ -133,7 +133,7 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java index a5e33a1..38d9034 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka010JsonTableSource extends KafkaJsonTableSource { @@ -121,7 +121,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Kafka010JsonTableSource.Builder builder() { @@ -134,7 +134,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java index b3f4e0a..fab592f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java @@ -39,7 +39,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka011AvroTableSource extends KafkaAvroTableSource { @@ -119,7 +119,7 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Builder builder() { @@ -132,7 +132,7 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java index 74c5007..375eead 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka011JsonTableSource extends KafkaJsonTableSource { @@ -121,7 +121,7 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Kafka011JsonTableSource.Builder builder() { @@ -134,7 +134,7 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java index 8206287..61c96bf 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java @@ -39,7 +39,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka08AvroTableSource extends KafkaAvroTableSource { @@ -120,7 +120,7 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Builder builder() { @@ -133,7 +133,7 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java index acb5783..dc5a077 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka08JsonTableSource extends KafkaJsonTableSource { @@ -120,7 +120,7 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Kafka08JsonTableSource.Builder builder() { @@ -133,7 +133,7 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaJsonTableSource.Builder<Kafka08JsonTableSource, Kafka08JsonTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java index cd4bad9..4352d7e4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java @@ -39,7 +39,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka09AvroTableSource extends KafkaAvroTableSource { @@ -118,7 +118,7 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Builder builder() { @@ -131,7 +131,7 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaAvroTableSource.Builder<Kafka09AvroTableSource, Kafka09AvroTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java index ad4e0d8..db1df3d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -36,7 +36,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public class Kafka09JsonTableSource extends KafkaJsonTableSource { @@ -120,7 +120,7 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static Kafka09JsonTableSource.Builder builder() { @@ -133,7 +133,7 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated public static class Builder extends KafkaJsonTableSource.Builder<Kafka09JsonTableSource, Kafka09JsonTableSource.Builder> { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java index 3e9f2b0..86fd21d 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java @@ -40,7 +40,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated @Internal @@ -56,7 +56,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated protected KafkaAvroTableSource( @@ -89,7 +89,7 @@ public abstract class KafkaAvroTableSource extends KafkaTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder> http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index bd0d0de..70b286b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -39,7 +39,7 @@ import java.util.Properties; * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated @Internal @@ -98,7 +98,7 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder> http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 78b373b..474c22f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -406,7 +406,7 @@ public abstract class KafkaTableSource implements * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated protected abstract static class Builder<T extends KafkaTableSource, B extends KafkaTableSource.Builder> { @@ -676,7 +676,7 @@ public abstract class KafkaTableSource implements * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for * implementation-agnostic definition of tables. See also - * {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}. + * {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}. */ @Deprecated protected abstract B builder(); http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java index 380d657..d7e42f5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java @@ -67,6 +67,8 @@ import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; /** * Factory for creating configured instances of {@link KafkaTableSource}. @@ -76,6 +78,7 @@ public abstract class KafkaTableSourceFactory implements StreamTableSourceFactor @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); + context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka context.put(CONNECTOR_VERSION(), kafkaVersion()); // version context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java index 51017f4..20da156 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java @@ -28,7 +28,7 @@ import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Rowtime; import org.apache.flink.table.descriptors.Schema; -import org.apache.flink.table.descriptors.TestTableSourceDescriptor; +import org.apache.flink.table.descriptors.TestTableDescriptor; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.sources.TableSource; @@ -135,20 +135,21 @@ public abstract class KafkaJsonTableSourceFactoryTestBase { offsets.put(0, 100L); offsets.put(1, 123L); - final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor( + final TestTableDescriptor testDesc = new TestTableDescriptor( new Kafka() .version(version()) .topic(TOPIC) .properties(props) .startFromSpecificOffsets(offsets)) - .addFormat(format) - .addSchema( + .withFormat(format) + .withSchema( new Schema() .field("fruit-name", Types.STRING).from("name") .field("count", Types.BIG_DEC) // no from so it must match with the input .field("event-time", Types.SQL_TIMESTAMP).rowtime( new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()) - .field("proc-time", Types.SQL_TIMESTAMP).proctime()); + .field("proc-time", Types.SQL_TIMESTAMP).proctime()) + .inAppendMode(); DescriptorProperties properties = new DescriptorProperties(true); testDesc.addProperties(properties); http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java index 1e8266d..96f1607 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactoryTestBase.java @@ -31,7 +31,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Rowtime; import org.apache.flink.table.descriptors.Schema; -import org.apache.flink.table.descriptors.TestTableSourceDescriptor; +import org.apache.flink.table.descriptors.TestTableDescriptor; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.factories.utils.TestDeserializationSchema; @@ -129,20 +129,21 @@ public abstract class KafkaTableSourceFactoryTestBase extends TestLogger { offsets.put(PARTITION_0, OFFSET_0); offsets.put(PARTITION_1, OFFSET_1); - final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor( + final TestTableDescriptor testDesc = new TestTableDescriptor( new Kafka() .version(getKafkaVersion()) .topic(TOPIC) .properties(KAFKA_PROPERTIES) .startFromSpecificOffsets(offsets)) - .addFormat(new TestTableFormat()) - .addSchema( + .withFormat(new TestTableFormat()) + .withSchema( new Schema() .field(FRUIT_NAME, Types.STRING()).from(NAME) .field(COUNT, Types.DECIMAL()) // no from so it must match with the input .field(EVENT_TIME, Types.SQL_TIMESTAMP()).rowtime( new Rowtime().timestampsFromField(TIME).watermarksPeriodicAscending()) - .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime()); + .field(PROC_TIME, Types.SQL_TIMESTAMP()).proctime()) + .inAppendMode(); final DescriptorProperties descriptorProperties = new DescriptorProperties(true); testDesc.addProperties(descriptorProperties); final Map<String, String> propertiesMap = descriptorProperties.asMap(); http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java index 0de65fb..49ac14a 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java @@ -19,14 +19,14 @@ package org.apache.flink.table.client.config; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.TableSinkDescriptor; +import org.apache.flink.table.descriptors.TableDescriptor; import java.util.Map; /** * Configuration of a table sink. */ -public class Sink extends TableSinkDescriptor { +public class Sink implements TableDescriptor { private String name; private Map<String, String> properties; http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java index ef80596..1eb0ad9 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java @@ -19,14 +19,14 @@ package org.apache.flink.table.client.config; import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.TableSourceDescriptor; +import org.apache.flink.table.descriptors.TableDescriptor; import java.util.Map; /** * Configuration of a table source. */ -public class Source extends TableSourceDescriptor { +public class Source implements TableDescriptor { private String name; private Map<String, String> properties; http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java index bfa3c44..b6b9bea 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java @@ -26,7 +26,7 @@ import java.util.Map; /** * Common class for all descriptors describing a table source and sink together. */ -public class SourceSink extends TableDescriptor { +public class SourceSink implements TableDescriptor { private String name; private Map<String, String> properties; http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index dcd94e2..6fbf29d 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -23,8 +23,9 @@ import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; import org.junit.Test; -import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -40,10 +41,17 @@ public class EnvironmentTest { @Test public void testMerging() throws Exception { - final Environment env1 = EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE); + final Map<String, String> replaceVars1 = new HashMap<>(); + replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append"); + final Environment env1 = EnvironmentFileUtil.parseModified( + DEFAULTS_ENVIRONMENT_FILE, + replaceVars1); + + final Map<String, String> replaceVars2 = new HashMap<>(replaceVars1); + replaceVars2.put("TableNumber1", "NewTable"); final Environment env2 = EnvironmentFileUtil.parseModified( FACTORY_ENVIRONMENT_FILE, - Collections.singletonMap("TableNumber1", "NewTable")); + replaceVars2); final Environment merged = Environment.merge(env1, env2); http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index f9bcaf3..bc29f6f 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -112,9 +113,12 @@ public class ExecutionContextTest { } private <T> ExecutionContext<T> createExecutionContext() throws Exception { + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_2", "streaming"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); final Environment env = EnvironmentFileUtil.parseModified( DEFAULTS_ENVIRONMENT_FILE, - Collections.singletonMap("$VAR_2", "streaming")); + replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); final Configuration flinkConfig = new Configuration(); return new ExecutionContext<>( http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index b1bf304..d8452e4 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -136,13 +136,12 @@ public class LocalExecutorITCase extends TestLogger { executor.getSessionProperties(session); // modify defaults - session.setSessionProperty("execution.type", "streaming"); session.setSessionProperty("execution.result-mode", "table"); final Map<String, String> actualProperties = executor.getSessionProperties(session); final Map<String, String> expectedProperties = new HashMap<>(); - expectedProperties.put("execution.type", "streaming"); + expectedProperties.put("execution.type", "batch"); expectedProperties.put("execution.time-characteristic", "event-time"); expectedProperties.put("execution.periodic-watermarks-interval", "99"); expectedProperties.put("execution.parallelism", "1"); @@ -178,6 +177,7 @@ public class LocalExecutorITCase extends TestLogger { replaceVars.put("$VAR_1", "/"); replaceVars.put("$VAR_2", "streaming"); replaceVars.put("$VAR_3", "changelog"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -216,6 +216,7 @@ public class LocalExecutorITCase extends TestLogger { replaceVars.put("$VAR_1", "/"); replaceVars.put("$VAR_2", "streaming"); replaceVars.put("$VAR_3", "table"); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -253,6 +254,7 @@ public class LocalExecutorITCase extends TestLogger { replaceVars.put("$VAR_1", "/"); replaceVars.put("$VAR_2", "batch"); replaceVars.put("$VAR_3", "table"); + replaceVars.put("$VAR_UPDATE_MODE", ""); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -287,6 +289,7 @@ public class LocalExecutorITCase extends TestLogger { replaceVars.put("$VAR_0", url.getPath()); replaceVars.put("$VAR_2", "streaming"); replaceVars.put("$VAR_4", csvOutputPath); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); final Executor executor = createModifiedExecutor(clusterClient, replaceVars); final SessionContext session = new SessionContext("test-session", new Environment()); @@ -333,8 +336,11 @@ public class LocalExecutorITCase extends TestLogger { } private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception { + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_2", "batch"); + replaceVars.put("$VAR_UPDATE_MODE", ""); return new LocalExecutor( - EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")), + EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars), Collections.emptyList(), clusterClient.getFlinkConfiguration(), new DummyCustomCommandLine<T>(clusterClient)); http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java index 4b50581..207d5d2 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactory.java @@ -45,6 +45,8 @@ import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERM import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; /** * Table sink factory for testing the classloading in {@link DependencyTest}. @@ -54,6 +56,7 @@ public class TestTableSinkFactory implements StreamTableSinkFactory<Row> { @Override public Map<String, String> requiredContext() { final Map<String, String> context = new HashMap<>(); + context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); return context; } http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java index a08472c..81f00e5 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java @@ -48,6 +48,8 @@ import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERM import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; +import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; /** * Table source factory for testing the classloading in {@link DependencyTest}. @@ -57,6 +59,7 @@ public class TestTableSourceFactory implements StreamTableSourceFactory<Row> { @Override public Map<String, String> requiredContext() { final Map<String, String> context = new HashMap<>(); + context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); return context; } http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index 2a886f0..b759874 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -26,6 +26,7 @@ tables: - name: TableNumber1 type: source + $VAR_UPDATE_MODE schema: - name: IntegerField1 type: INT @@ -45,6 +46,7 @@ tables: comment-prefix: "#" - name: TableNumber2 type: source + $VAR_UPDATE_MODE schema: - name: IntegerField2 type: INT @@ -64,6 +66,7 @@ tables: comment-prefix: "#" - name: TableSourceSink type: both + $VAR_UPDATE_MODE schema: - name: BooleanField type: BOOLEAN http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml index 01ad63d..3ce513e 100644 --- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml +++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml @@ -26,6 +26,7 @@ tables: - name: TableNumber1 type: both + update-mode: append schema: - name: IntegerField1 type: INT http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index 4cda0ad..ece39eb 100644 --- a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,5 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.sources.CsvTableSourceFactory -org.apache.flink.table.sinks.CsvTableSinkFactory +org.apache.flink.table.sources.CsvBatchTableSourceFactory +org.apache.flink.table.sources.CsvAppendTableSourceFactory +org.apache.flink.table.sinks.CsvBatchTableSinkFactory +org.apache.flink.table.sinks.CsvAppendTableSinkFactory http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index a239ad5..9265f0f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor} +import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions @@ -126,7 +126,7 @@ abstract class BatchTableEnvironment( } // no table is registered - case None => + case _ => val newTable = new TableSourceSinkTable( Some(new BatchTableSourceTable(batchTableSource)), None) @@ -141,26 +141,21 @@ abstract class BatchTableEnvironment( } /** - * Creates a table from a descriptor that describes the source connector, the source format, - * the resulting table schema, and other properties. + * Creates a table source and/or table sink from a descriptor. * - * Descriptors allow for declaring communication to external systems in an - * implementation-agnostic way. The classpath is scanned for connectors and matching connectors - * are configured accordingly. + * Descriptors allow for declaring the communication to external systems in an + * implementation-agnostic way. The classpath is scanned for suitable table factories that match + * the desired configuration. * - * The following example shows how to read from a Kafka connector using a JSON format and - * creating a table: + * The following example shows how to read from a connector using a JSON format and + * registering a table source as "MyTable": * * {{{ * * tableEnv - * .from( - * new Kafka() - * .version("0.11") - * .topic("clicks") - * .property("zookeeper.connect", "localhost") - * .property("group.id", "click-group") - * .startFromEarliest()) + * .connect( + * new ExternalSystemXYZ() + * .version("0.11")) * .withFormat( * new Json() * .jsonSchema("{...}") @@ -169,14 +164,13 @@ abstract class BatchTableEnvironment( * new Schema() * .field("user-name", "VARCHAR").from("u_name") * .field("count", "DECIMAL") - * .field("proc-time", "TIMESTAMP").proctime()) - * .toTable() + * .registerSource("MyTable") * }}} * - * @param connectorDescriptor connector descriptor describing the source of the table + * @param connectorDescriptor connector descriptor describing the external system */ - def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { - new BatchTableSourceDescriptor(this, connectorDescriptor) + def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor = { + new BatchTableDescriptor(this, connectorDescriptor) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 33b984d..4c73032 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter} -import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableSourceDescriptor} +import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.nodes.FlinkConventions @@ -145,7 +145,7 @@ abstract class StreamTableEnvironment( } // no table is registered - case None => + case _ => val newTable = new TableSourceSinkTable( Some(new StreamTableSourceTable(streamTableSource)), None) @@ -160,20 +160,19 @@ abstract class StreamTableEnvironment( } /** - * Creates a table from a descriptor that describes the source connector, the source format, - * the resulting table schema, and other properties. + * Creates a table source and/or table sink from a descriptor. * - * Descriptors allow for declaring communication to external systems in an - * implementation-agnostic way. The classpath is scanned for connectors and matching connectors - * are configured accordingly. + * Descriptors allow for declaring the communication to external systems in an + * implementation-agnostic way. The classpath is scanned for suitable table factories that match + * the desired configuration. * * The following example shows how to read from a Kafka connector using a JSON format and - * creating a table: + * registering a table source "MyTable" in append mode: * * {{{ * * tableEnv - * .from( + * .connect( * new Kafka() * .version("0.11") * .topic("clicks") @@ -189,13 +188,14 @@ abstract class StreamTableEnvironment( * .field("user-name", "VARCHAR").from("u_name") * .field("count", "DECIMAL") * .field("proc-time", "TIMESTAMP").proctime()) - * .toTable() + * .inAppendMode() + * .registerSource("MyTable") * }}} * - * @param connectorDescriptor connector descriptor describing the source of the table + * @param connectorDescriptor connector descriptor describing the external system */ - def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = { - new StreamTableSourceDescriptor(this, connectorDescriptor) + def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = { + new StreamTableDescriptor(this, connectorDescriptor) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 6a299dd..37f6d02 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -49,7 +49,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableSourceDescriptor} +import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor} import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} @@ -523,26 +523,21 @@ abstract class TableEnvironment(val config: TableConfig) { } /** - * Creates a table from a descriptor that describes the source connector, the source format, - * the resulting table schema, and other properties. + * Creates a table source and/or table sink from a descriptor. * - * Descriptors allow for declaring communication to external systems in an - * implementation-agnostic way. The classpath is scanned for connectors and matching connectors - * are configured accordingly. + * Descriptors allow for declaring the communication to external systems in an + * implementation-agnostic way. The classpath is scanned for suitable table factories that match + * the desired configuration. * - * The following example shows how to read from a Kafka connector using a JSON format and - * creating table: + * The following example shows how to read from a connector using a JSON format and + * registering a table source as "MyTable": * * {{{ * * tableEnv - * .from( - * new Kafka() - * .version("0.11") - * .topic("clicks") - * .property("zookeeper.connect", "localhost") - * .property("group.id", "click-group") - * .startFromEarliest()) + * .connect( + * new ExternalSystemXYZ() + * .version("0.11")) * .withFormat( * new Json() * .jsonSchema("{...}") @@ -551,13 +546,12 @@ abstract class TableEnvironment(val config: TableConfig) { * new Schema() * .field("user-name", "VARCHAR").from("u_name") * .field("count", "DECIMAL") - * .field("proc-time", "TIMESTAMP").proctime()) - * .toTable() + * .registerSource("MyTable") * }}} * - * @param connectorDescriptor connector descriptor describing the source of the table + * @param connectorDescriptor connector descriptor describing the external system */ - def from(connectorDescriptor: ConnectorDescriptor): TableSourceDescriptor + def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala index 776ddee..adac938 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala @@ -77,7 +77,7 @@ class ExternalCatalogSchema( */ override def getTable(name: String): Table = try { val externalCatalogTable = catalog.getTable(name) - ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable) + ExternalTableUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable) } catch { case TableNotExistException(table, _, _) => { LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier") http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index c41a350..79da852 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -18,33 +18,326 @@ package org.apache.flink.table.catalog +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.table.descriptors._ +import org.apache.flink.table.factories.TableFactory import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** - * Defines a table in an [[ExternalCatalog]]. + * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources + * and/or sinks for both batch and stream environments. + * + * See also [[TableFactory]] for more information about how to target suitable factories. + * + * Use [[ExternalCatalogTableBuilder]] to integrate with the normalized descriptor-based API. * - * @param connectorDesc describes the system to connect to - * @param formatDesc describes the data format of a connector - * @param schemaDesc describes the schema of the result table - * @param statisticsDesc describes the estimated statistics of the result table - * @param metadataDesc describes additional metadata of a table + * @param isBatch Flag whether this external table is intended for batch environments. + * @param isStreaming Flag whether this external table is intended for streaming environments. + * @param isSource Flag whether this external table is declared as table source. + * @param isSink Flag whether this external table is declared as table sink. + * @param properties Properties that describe the table and should match with a [[TableFactory]]. */ class ExternalCatalogTable( - connectorDesc: ConnectorDescriptor, - formatDesc: Option[FormatDescriptor], - schemaDesc: Option[Schema], - statisticsDesc: Option[Statistics], - metadataDesc: Option[Metadata]) - extends TableSourceDescriptor { - - this.connectorDescriptor = Some(connectorDesc) - this.formatDescriptor = formatDesc - this.schemaDescriptor = schemaDesc - this.statisticsDescriptor = statisticsDesc - this.metaDescriptor = metadataDesc - - // expose statistics for external table source util - override def getTableStats: Option[TableStats] = super.getTableStats + private val isBatch: Boolean, + private val isStreaming: Boolean, + private val isSource: Boolean, + private val isSink: Boolean, + private val properties: java.util.Map[String, String]) + extends TableDescriptor { + + // ---------------------------------------------------------------------------------------------- + // Legacy code + // --------------------------------------------------------------------------------------------- + + /** + * Reads table statistics from the descriptors properties. + * + * @deprecated This method exists for backwards-compatibility only. + */ + @Deprecated + @deprecated + def getTableStats: Option[TableStats] = { + val normalizedProps = new DescriptorProperties() + addProperties(normalizedProps) + val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT)) + rowCount match { + case Some(cnt) => + val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) + Some(TableStats(cnt, columnStats.asJava)) + case None => + None + } + } + + // ---------------------------------------------------------------------------------------------- + // Getters + // ---------------------------------------------------------------------------------------------- + + /** + * Returns whether this external table is declared as table source. + */ + def isTableSource: Boolean = { + isSource + } + + /** + * Returns whether this external table is declared as table sink. + */ + def isTableSink: Boolean = { + isSource + } + + /** + * Returns whether this external table is intended for batch environments. + */ + def isBatchTable: Boolean = { + isBatch + } + + /** + * Returns whether this external table is intended for stream environments. + */ + def isStreamTable: Boolean = { + isStreaming + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(descriptorProperties: DescriptorProperties): Unit = { + descriptorProperties.putProperties(properties) + } +} + +object ExternalCatalogTable { + + /** + * Creates a builder for creating an [[ExternalCatalogTable]]. + * + * It takes [[Descriptor]]s which allow for declaring the communication to external + * systems in an implementation-agnostic way. The classpath is scanned for suitable table + * factories that match the desired configuration. + * + * Use the provided builder methods to configure the external catalog table accordingly. + * + * The following example shows how to read from a connector using a JSON format and + * declaring it as a table source: + * + * {{{ + * ExternalCatalogTable( + * new ExternalSystemXYZ() + * .version("0.11")) + * .withFormat( + * new Json() + * .jsonSchema("{...}") + * .failOnMissingField(false)) + * .withSchema( + * new Schema() + * .field("user-name", "VARCHAR").from("u_name") + * .field("count", "DECIMAL") + * .supportsStreaming() + * .asTableSource() + * }}} + * + * @param connectorDescriptor Connector descriptor describing the external system + * @return External catalog builder + */ + def builder(connectorDescriptor: ConnectorDescriptor): ExternalCatalogTableBuilder = { + new ExternalCatalogTableBuilder(connectorDescriptor) + } +} + +/** + * Builder for an [[ExternalCatalogTable]]. + * + * @param connectorDescriptor Connector descriptor describing the external system + */ +class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor[ExternalCatalogTableBuilder] + with StreamableDescriptor[ExternalCatalogTableBuilder] { + + private var isBatch: Boolean = true + private var isStreaming: Boolean = true + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var statisticsDescriptor: Option[Statistics] = None + private var metadataDescriptor: Option[Metadata] = None + private var updateMode: Option[String] = None + + /** + * Specifies the format that defines how to read data from a connector. + */ + override def withFormat(format: FormatDescriptor): ExternalCatalogTableBuilder = { + formatDescriptor = Some(format) + this + } + + /** + * Specifies the resulting table schema. + */ + override def withSchema(schema: Schema): ExternalCatalogTableBuilder = { + schemaDescriptor = Some(schema) + this + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In append mode, a dynamic table and an external connector only exchange INSERT messages. + * + * @see See also [[inRetractMode()]] and [[inUpsertMode()]]. + */ + override def inAppendMode(): ExternalCatalogTableBuilder = { + updateMode = Some(UPDATE_MODE_VALUE_APPEND) + this + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. + * + * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + * the updating (new) row. + * + * In this mode, a key must not be defined as opposed to upsert mode. However, every update + * consists of two messages which is less efficient. + * + * @see See also [[inAppendMode()]] and [[inUpsertMode()]]. + */ + override def inRetractMode(): ExternalCatalogTableBuilder = { + updateMode = Some(UPDATE_MODE_VALUE_RETRACT) + this + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. + * + * This mode requires a (possibly composite) unique key by which updates can be propagated. The + * external connector needs to be aware of the unique key attribute in order to apply messages + * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + * DELETE messages. + * + * The main difference to a retract stream is that UPDATE changes are encoded with a single + * message and are therefore more efficient. + * + * @see See also [[inAppendMode()]] and [[inRetractMode()]]. + */ + override def inUpsertMode(): ExternalCatalogTableBuilder = { + updateMode = Some(UPDATE_MODE_VALUE_UPSERT) + this + } + + /** + * Specifies the statistics for this external table. + */ + def withStatistics(statistics: Statistics): ExternalCatalogTableBuilder = { + statisticsDescriptor = Some(statistics) + this + } + + /** + * Specifies the metadata for this external table. + */ + def withMetadata(metadata: Metadata): ExternalCatalogTableBuilder = { + metadataDescriptor = Some(metadata) + this + } + + /** + * Explicitly declares this external table for supporting only stream environments. + */ + def supportsStreaming(): ExternalCatalogTableBuilder = { + isBatch = false + isStreaming = true + this + } + + /** + * Explicitly declares this external table for supporting only batch environments. + */ + def supportsBatch(): ExternalCatalogTableBuilder = { + isBatch = false + isStreaming = true + this + } + + /** + * Explicitly declares this external table for supporting both batch and stream environments. + */ + def supportsBatchAndStreaming(): ExternalCatalogTableBuilder = { + isBatch = true + isStreaming = true + this + } + + /** + * Declares this external table as a table source and returns the + * configured [[ExternalCatalogTable]]. + * + * @return External catalog table + */ + def asTableSource(): ExternalCatalogTable = { + new ExternalCatalogTable( + isBatch, + isStreaming, + isSource = true, + isSink = false, + DescriptorProperties.toJavaMap(this)) + } + + /** + * Declares this external table as a table sink and returns the + * configured [[ExternalCatalogTable]]. + * + * @return External catalog table + */ + def asTableSink(): ExternalCatalogTable = { + new ExternalCatalogTable( + isBatch, + isStreaming, + isSource = false, + isSink = true, + DescriptorProperties.toJavaMap(this)) + } + + /** + * Declares this external table as both a table source and sink. It returns the + * configured [[ExternalCatalogTable]]. + * + * @return External catalog table + */ + def asTableSourceAndSink(): ExternalCatalogTable = { + new ExternalCatalogTable( + isBatch, + isStreaming, + isSource = true, + isSink = true, + DescriptorProperties.toJavaMap(this)) + } + + // ---------------------------------------------------------------------------------------------- + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + connectorDescriptor.addProperties(properties) + formatDescriptor.foreach(_.addProperties(properties)) + schemaDescriptor.foreach(_.addProperties(properties)) + statisticsDescriptor.foreach(_.addProperties(properties)) + metadataDescriptor.foreach(_.addProperties(properties)) + updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala deleted file mode 100644 index 011cbec..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog - -import org.apache.flink.table.api._ -import org.apache.flink.table.descriptors.DescriptorProperties -import org.apache.flink.table.factories.{BatchTableSourceFactory, StreamTableSourceFactory, TableFactoryService} -import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable} -import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.util.Logging - -/** - * The utility class is used to convert ExternalCatalogTable to TableSourceTable. - */ -object ExternalTableSourceUtil extends Logging { - - /** - * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance - * - * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert - * @return converted [[TableSourceTable]] instance from the input catalog table - */ - def fromExternalCatalogTable( - tableEnv: TableEnvironment, - externalCatalogTable: ExternalCatalogTable) - : TableSourceSinkTable[_, _] = { - val properties = new DescriptorProperties() - externalCatalogTable.addProperties(properties) - val javaMap = properties.asMap - tableEnv match { - // check for a batch table source in this batch environment - case _: BatchTableEnvironment => - val source = TableFactoryService - .find(classOf[BatchTableSourceFactory[_]], javaMap) - .createBatchTableSource(javaMap) - val sourceTable = new BatchTableSourceTable( - source, - new FlinkStatistic(externalCatalogTable.getTableStats)) - new TableSourceSinkTable(Some(sourceTable), None) - - // check for a stream table source in this streaming environment - case _: StreamTableEnvironment => - val source = TableFactoryService - .find(classOf[StreamTableSourceFactory[_]], javaMap) - .createStreamTableSource(javaMap) - val sourceTable = new StreamTableSourceTable( - source, - new FlinkStatistic(externalCatalogTable.getTableStats)) - new TableSourceSinkTable(Some(sourceTable), None) - - case _ => throw new TableException("Unsupported table environment.") - } - } -}
