This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6895da4f762e506c4d0fa8d6dea427094e84173 Author: JingsongLi <[email protected]> AuthorDate: Mon Feb 10 14:58:41 2020 +0800 [FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory This closes #11047 --- .../table/factories/StreamTableSinkFactory.java | 15 +++++- .../table/factories/StreamTableSourceFactory.java | 15 +++++- .../flink/table/factories/TableSinkFactory.java | 49 ++++++++++++++++- .../factories/TableSinkFactoryContextImpl.java | 61 ++++++++++++++++++++++ .../flink/table/factories/TableSourceFactory.java | 49 ++++++++++++++++- .../factories/TableSourceFactoryContextImpl.java | 61 ++++++++++++++++++++++ 6 files changed, 244 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java index 86c7e82..c721424 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sinks.TableSink; @@ -38,14 +39,24 @@ public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> { * * @param properties normalized properties describing a table sink. * @return the configured table sink. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSink(Context)} instead. */ - StreamTableSink<T> createStreamTableSink(Map<String, String> properties); + @Deprecated + default StreamTableSink<T> createStreamTableSink(Map<String, String> properties) { + return null; + } /** * Only create stream table sink. */ @Override default TableSink<T> createTableSink(Map<String, String> properties) { - return createStreamTableSink(properties); + StreamTableSink<T> sink = createStreamTableSink(properties); + if (sink == null) { + throw new ValidationException( + "Please override 'createTableSink(Context)' method."); + } + return sink; } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java index b007d7b..3e392ed 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.TableSource; @@ -38,14 +39,24 @@ public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> { * * @param properties normalized properties describing a stream table source. * @return the configured stream table source. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSource(Context)} instead. */ - StreamTableSource<T> createStreamTableSource(Map<String, String> properties); + @Deprecated + default StreamTableSource<T> createStreamTableSource(Map<String, String> properties) { + return null; + } /** * Only create a stream table source. */ @Override default TableSource<T> createTableSource(Map<String, String> properties) { - return createStreamTableSource(properties); + StreamTableSource<T> source = createStreamTableSource(properties); + if (source == null) { + throw new ValidationException( + "Please override 'createTableSource(Context)' method."); + } + return source; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java index 05d0f34..c92ec9f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java @@ -19,7 +19,9 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.sinks.TableSink; @@ -39,8 +41,13 @@ public interface TableSinkFactory<T> extends TableFactory { * * @param properties normalized properties describing a table sink. * @return the configured table sink. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSink(Context)} instead. */ - TableSink<T> createTableSink(Map<String, String> properties); + @Deprecated + default TableSink<T> createTableSink(Map<String, String> properties) { + return null; + } /** * Creates and configures a {@link TableSink} based on the given {@link CatalogTable} instance. @@ -48,9 +55,49 @@ public interface TableSinkFactory<T> extends TableFactory { * @param tablePath path of the given {@link CatalogTable} * @param table {@link CatalogTable} instance. * @return the configured table sink. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSink(Context)} instead. */ + @Deprecated default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable table) { return createTableSink(table.toProperties()); } + /** + * Creates and configures a {@link TableSink} based on the given + {@link Context}. + * + * @param context context of this table sink. + * @return the configured table sink. + */ + default TableSink<T> createTableSink(Context context) { + return createTableSink( + context.getObjectIdentifier().toObjectPath(), + context.getTable()); + } + + /** + * Context of table sink creation. Contains table information and + environment information. + */ + interface Context { + + /** + * @return full identifier of the given {@link CatalogTable}. + */ + ObjectIdentifier getObjectIdentifier(); + + /** + * @return table {@link CatalogTable} instance. + */ + CatalogTable getTable(); + + /** + * @return readable config of this table environment. The configuration gives the ability + * to access {@code TableConfig#getConfiguration()} which holds the current + * {@code TableEnvironment} session configurations. + */ + ReadableConfig getConfiguration(); + } + } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java new file mode 100644 index 0000000..e69d353 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of {@link TableSinkFactory.Context}. + */ +@Internal +public class TableSinkFactoryContextImpl implements TableSinkFactory.Context { + + private final ObjectIdentifier identifier; + private final CatalogTable table; + private final ReadableConfig config; + + public TableSinkFactoryContextImpl( + ObjectIdentifier identifier, + CatalogTable table, + ReadableConfig config) { + this.identifier = checkNotNull(identifier); + this.table = checkNotNull(table); + this.config = checkNotNull(config); + } + + @Override + public ObjectIdentifier getObjectIdentifier() { + return identifier; + } + + @Override + public CatalogTable getTable() { + return table; + } + + @Override + public ReadableConfig getConfiguration() { + return config; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java index c0f97d9..8cc5bdc 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java @@ -19,7 +19,9 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.sources.TableSource; @@ -39,8 +41,13 @@ public interface TableSourceFactory<T> extends TableFactory { * * @param properties normalized properties describing a table source. * @return the configured table source. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSource(Context)} instead. */ - TableSource<T> createTableSource(Map<String, String> properties); + @Deprecated + default TableSource<T> createTableSource(Map<String, String> properties) { + return null; + } /** * Creates and configures a {@link TableSource} based on the given {@link CatalogTable} instance. @@ -48,9 +55,49 @@ public interface TableSourceFactory<T> extends TableFactory { * @param tablePath path of the given {@link CatalogTable} * @param table {@link CatalogTable} instance. * @return the configured table source. + * @deprecated {@link Context} contains more information, and already contains table schema too. + * Please use {@link #createTableSource(Context)} instead. */ + @Deprecated default TableSource<T> createTableSource(ObjectPath tablePath, CatalogTable table) { return createTableSource(table.toProperties()); } + /** + * Creates and configures a {@link TableSource} based on the given + {@link Context}. + * + * @param context context of this table source. + * @return the configured table source. + */ + default TableSource<T> createTableSource(Context context) { + return createTableSource( + context.getObjectIdentifier().toObjectPath(), + context.getTable()); + } + + /** + * Context of table source creation. Contains table information and + environment information. + */ + interface Context { + + /** + * @return full identifier of the given {@link CatalogTable}. + */ + ObjectIdentifier getObjectIdentifier(); + + /** + * @return table {@link CatalogTable} instance. + */ + CatalogTable getTable(); + + /** + * @return readable config of this table environment. The configuration gives the ability + * to access {@code TableConfig#getConfiguration()} which holds the current + * {@code TableEnvironment} session configurations. + */ + ReadableConfig getConfiguration(); + } + } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java new file mode 100644 index 0000000..5fcab42 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of {@link TableSourceFactory.Context}. + */ +@Internal +public class TableSourceFactoryContextImpl implements TableSourceFactory.Context { + + private final ObjectIdentifier identifier; + private final CatalogTable table; + private final ReadableConfig config; + + public TableSourceFactoryContextImpl( + ObjectIdentifier identifier, + CatalogTable table, + ReadableConfig config) { + this.identifier = checkNotNull(identifier); + this.table = checkNotNull(table); + this.config = checkNotNull(config); + } + + @Override + public ObjectIdentifier getObjectIdentifier() { + return identifier; + } + + @Override + public CatalogTable getTable() { + return table; + } + + @Override + public ReadableConfig getConfiguration() { + return config; + } +}
