This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6895da4f762e506c4d0fa8d6dea427094e84173
Author: JingsongLi <[email protected]>
AuthorDate: Mon Feb 10 14:58:41 2020 +0800

    [FLINK-15912][table] Add Context to TableSourceFactory and TableSinkFactory
    
    This closes #11047
---
 .../table/factories/StreamTableSinkFactory.java    | 15 +++++-
 .../table/factories/StreamTableSourceFactory.java  | 15 +++++-
 .../flink/table/factories/TableSinkFactory.java    | 49 ++++++++++++++++-
 .../factories/TableSinkFactoryContextImpl.java     | 61 ++++++++++++++++++++++
 .../flink/table/factories/TableSourceFactory.java  | 49 ++++++++++++++++-
 .../factories/TableSourceFactoryContextImpl.java   | 61 ++++++++++++++++++++++
 6 files changed, 244 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
index 86c7e82..c721424 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSinkFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -38,14 +39,24 @@ public interface StreamTableSinkFactory<T> extends 
TableSinkFactory<T> {
         *
         * @param properties normalized properties describing a table sink.
         * @return the configured table sink.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSink(Context)} instead.
         */
-       StreamTableSink<T> createStreamTableSink(Map<String, String> 
properties);
+       @Deprecated
+       default StreamTableSink<T> createStreamTableSink(Map<String, String> 
properties) {
+               return null;
+       }
 
        /**
         * Only create stream table sink.
         */
        @Override
        default TableSink<T> createTableSink(Map<String, String> properties) {
-               return createStreamTableSink(properties);
+               StreamTableSink<T> sink = createStreamTableSink(properties);
+               if (sink == null) {
+                       throw new ValidationException(
+                                       "Please override 
'createTableSink(Context)' method.");
+               }
+               return sink;
        }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
index b007d7b..3e392ed 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/StreamTableSourceFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -38,14 +39,24 @@ public interface StreamTableSourceFactory<T> extends 
TableSourceFactory<T> {
         *
         * @param properties normalized properties describing a stream table 
source.
         * @return the configured stream table source.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSource(Context)} instead.
         */
-       StreamTableSource<T> createStreamTableSource(Map<String, String> 
properties);
+       @Deprecated
+       default StreamTableSource<T> createStreamTableSource(Map<String, 
String> properties) {
+               return null;
+       }
 
        /**
         * Only create a stream table source.
         */
        @Override
        default TableSource<T> createTableSource(Map<String, String> 
properties) {
-               return createStreamTableSource(properties);
+               StreamTableSource<T> source = 
createStreamTableSource(properties);
+               if (source == null) {
+                       throw new ValidationException(
+                                       "Please override 
'createTableSource(Context)' method.");
+               }
+               return source;
        }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
index 05d0f34..c92ec9f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sinks.TableSink;
 
@@ -39,8 +41,13 @@ public interface TableSinkFactory<T> extends TableFactory {
         *
         * @param properties normalized properties describing a table sink.
         * @return the configured table sink.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSink(Context)} instead.
         */
-       TableSink<T> createTableSink(Map<String, String> properties);
+       @Deprecated
+       default TableSink<T> createTableSink(Map<String, String> properties) {
+               return null;
+       }
 
        /**
         * Creates and configures a {@link TableSink} based on the given {@link 
CatalogTable} instance.
@@ -48,9 +55,49 @@ public interface TableSinkFactory<T> extends TableFactory {
         * @param tablePath path of the given {@link CatalogTable}
         * @param table {@link CatalogTable} instance.
         * @return the configured table sink.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSink(Context)} instead.
         */
+       @Deprecated
        default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable 
table) {
                return createTableSink(table.toProperties());
        }
 
+       /**
+        * Creates and configures a {@link TableSink} based on the given
+        {@link Context}.
+        *
+        * @param context context of this table sink.
+        * @return the configured table sink.
+        */
+       default TableSink<T> createTableSink(Context context) {
+               return createTableSink(
+                               context.getObjectIdentifier().toObjectPath(),
+                               context.getTable());
+       }
+
+       /**
+        * Context of table sink creation. Contains table information and
+        environment information.
+        */
+       interface Context {
+
+               /**
+                * @return full identifier of the given {@link CatalogTable}.
+                */
+               ObjectIdentifier getObjectIdentifier();
+
+               /**
+                * @return table {@link CatalogTable} instance.
+                */
+               CatalogTable getTable();
+
+               /**
+                * @return readable config of this table environment. The 
configuration gives the ability
+                * to access {@code TableConfig#getConfiguration()} which holds 
the current
+                * {@code TableEnvironment} session configurations.
+                */
+               ReadableConfig getConfiguration();
+       }
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java
new file mode 100644
index 0000000..e69d353
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactoryContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of {@link TableSinkFactory.Context}.
+ */
+@Internal
+public class TableSinkFactoryContextImpl implements TableSinkFactory.Context {
+
+       private final ObjectIdentifier identifier;
+       private final CatalogTable table;
+       private final ReadableConfig config;
+
+       public TableSinkFactoryContextImpl(
+                       ObjectIdentifier identifier,
+                       CatalogTable table,
+                       ReadableConfig config) {
+               this.identifier = checkNotNull(identifier);
+               this.table = checkNotNull(table);
+               this.config = checkNotNull(config);
+       }
+
+       @Override
+       public ObjectIdentifier getObjectIdentifier() {
+               return identifier;
+       }
+
+       @Override
+       public CatalogTable getTable() {
+               return table;
+       }
+
+       @Override
+       public ReadableConfig getConfiguration() {
+               return config;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
index c0f97d9..8cc5bdc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sources.TableSource;
 
@@ -39,8 +41,13 @@ public interface TableSourceFactory<T> extends TableFactory {
         *
         * @param properties normalized properties describing a table source.
         * @return the configured table source.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSource(Context)} instead.
         */
-       TableSource<T> createTableSource(Map<String, String> properties);
+       @Deprecated
+       default TableSource<T> createTableSource(Map<String, String> 
properties) {
+               return null;
+       }
 
        /**
         * Creates and configures a {@link TableSource} based on the given 
{@link CatalogTable} instance.
@@ -48,9 +55,49 @@ public interface TableSourceFactory<T> extends TableFactory {
         * @param tablePath path of the given {@link CatalogTable}
         * @param table {@link CatalogTable} instance.
         * @return the configured table source.
+        * @deprecated {@link Context} contains more information, and already 
contains table schema too.
+        * Please use {@link #createTableSource(Context)} instead.
         */
+       @Deprecated
        default TableSource<T> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
                return createTableSource(table.toProperties());
        }
 
+       /**
+        * Creates and configures a {@link TableSource} based on the given
+        {@link Context}.
+        *
+        * @param context context of this table source.
+        * @return the configured table source.
+        */
+       default TableSource<T> createTableSource(Context context) {
+               return createTableSource(
+                               context.getObjectIdentifier().toObjectPath(),
+                               context.getTable());
+       }
+
+       /**
+        * Context of table source creation. Contains table information and
+        environment information.
+        */
+       interface Context {
+
+               /**
+                * @return full identifier of the given {@link CatalogTable}.
+                */
+               ObjectIdentifier getObjectIdentifier();
+
+               /**
+                * @return table {@link CatalogTable} instance.
+                */
+               CatalogTable getTable();
+
+               /**
+                * @return readable config of this table environment. The 
configuration gives the ability
+                * to access {@code TableConfig#getConfiguration()} which holds 
the current
+                * {@code TableEnvironment} session configurations.
+                */
+               ReadableConfig getConfiguration();
+       }
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java
new file mode 100644
index 0000000..5fcab42
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactoryContextImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of {@link TableSourceFactory.Context}.
+ */
+@Internal
+public class TableSourceFactoryContextImpl implements 
TableSourceFactory.Context {
+
+       private final ObjectIdentifier identifier;
+       private final CatalogTable table;
+       private final ReadableConfig config;
+
+       public TableSourceFactoryContextImpl(
+                       ObjectIdentifier identifier,
+                       CatalogTable table,
+                       ReadableConfig config) {
+               this.identifier = checkNotNull(identifier);
+               this.table = checkNotNull(table);
+               this.config = checkNotNull(config);
+       }
+
+       @Override
+       public ObjectIdentifier getObjectIdentifier() {
+               return identifier;
+       }
+
+       @Override
+       public CatalogTable getTable() {
+               return table;
+       }
+
+       @Override
+       public ReadableConfig getConfiguration() {
+               return config;
+       }
+}

Reply via email to