This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 946a41c2fe8016b39c9737b1f926bed0d7a23e95
Author: slinkydeveloper <[email protected]>
AuthorDate: Wed Sep 29 17:48:02 2021 +0200

    [FLINK-24399][table-common] Add 
DynamicTableSource.Context#createTypeInformation(LogicalType) and 
DynamicTableSink.Context#createTypeInformation(LogicalType)
    
    Signed-off-by: slinkydeveloper <[email protected]>
---
 .../table/Elasticsearch6DynamicSinkITCase.java             |  6 ++++++
 .../elasticsearch/table/Elasticsearch6DynamicSinkTest.java |  6 ++++++
 .../table/Elasticsearch7DynamicSinkITCase.java             |  6 ++++++
 .../elasticsearch/table/Elasticsearch7DynamicSinkTest.java |  6 ++++++
 .../flink/table/connector/sink/DynamicTableSink.java       |  6 ++++++
 .../flink/table/connector/source/DynamicTableSource.java   | 14 ++++++++++----
 .../apache/flink/table/filesystem/FileSystemTableSink.java |  6 ++++++
 .../runtime/connector/sink/SinkRuntimeProviderContext.java |  6 ++++++
 .../connector/source/LookupRuntimeProviderContext.java     |  6 ++++++
 .../connector/source/ScanRuntimeProviderContext.java       |  6 ++++++
 10 files changed, 64 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 77449d5..0ebc52b 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
@@ -385,6 +386,11 @@ public class Elasticsearch6DynamicSinkITCase extends 
TestLogger {
         }
 
         @Override
+        public TypeInformation<?> createTypeInformation(LogicalType 
consumedLogicalType) {
+            return null;
+        }
+
+        @Override
         public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(
                 DataType consumedDataType) {
             return null;
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
index 6ad4f78..d1af3ec 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.http.HttpHost;
@@ -257,6 +258,11 @@ public class Elasticsearch6DynamicSinkTest extends 
TestLogger {
         }
 
         @Override
+        public TypeInformation<?> createTypeInformation(LogicalType 
consumedLogicalType) {
+            return null;
+        }
+
+        @Override
         public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(
                 DataType consumedDataType) {
             return null;
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index a74496c..fdf7a3a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
@@ -362,6 +363,11 @@ public class Elasticsearch7DynamicSinkITCase extends 
TestLogger {
         }
 
         @Override
+        public TypeInformation<?> createTypeInformation(LogicalType 
consumedLogicalType) {
+            return null;
+        }
+
+        @Override
         public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(
                 DataType consumedDataType) {
             return null;
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
index e8854b2..6ab6828 100644
--- 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.http.HttpHost;
@@ -257,6 +258,11 @@ public class Elasticsearch7DynamicSinkTest extends 
TestLogger {
         }
 
         @Override
+        public TypeInformation<?> createTypeInformation(LogicalType 
consumedLogicalType) {
+            return null;
+        }
+
+        @Override
         public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(
                 DataType consumedDataType) {
             return null;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index d1ff721..dd84b71 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -150,6 +150,12 @@ public interface DynamicTableSink {
         <T> TypeInformation<T> createTypeInformation(DataType 
consumedDataType);
 
         /**
+         * Creates type information describing the internal data structures of 
the given {@link
+         * LogicalType}.
+         */
+        <T> TypeInformation<T> createTypeInformation(LogicalType 
consumedLogicalType);
+
+        /**
          * Creates a converter for mapping between Flink's internal data 
structures and objects
          * specified by the given {@link DataType} that can be passed into a 
runtime implementation.
          *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
index f90f5dd..9180dba 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.RuntimeConverter;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -52,7 +52,7 @@ import java.io.Serializable;
  * <p>Note: Both interfaces can be implemented at the same time. The planner 
decides about their
  * usage depending on the specified query.
  *
- * <p>Instances of the above mentioned interfaces can be seen as factories 
that eventually produce
+ * <p>Instances of the above-mentioned interfaces can be seen as factories 
that eventually produce
  * concrete runtime implementation for reading the actual data.
  *
  * <p>Depending on the optionally declared abilities such as {@link 
SupportsProjectionPushDown} or
@@ -94,11 +94,17 @@ public interface DynamicTableSource {
          * Creates type information describing the internal data structures of 
the given {@link
          * DataType}.
          *
-         * @see TableSchema#toPhysicalRowDataType()
+         * @see ResolvedSchema#toPhysicalRowDataType()
          */
         <T> TypeInformation<T> createTypeInformation(DataType 
producedDataType);
 
         /**
+         * Creates type information describing the internal data structures of 
the given {@link
+         * LogicalType}.
+         */
+        <T> TypeInformation<T> createTypeInformation(LogicalType 
producedLogicalType);
+
+        /**
          * Creates a converter for mapping between objects specified by the 
given {@link DataType}
          * and Flink's internal data structures that can be passed into a 
runtime implementation.
          *
@@ -107,7 +113,7 @@ public interface DynamicTableSource {
          * types.
          *
          * @see LogicalType#supportsInputConversion(Class)
-         * @see TableSchema#toPhysicalRowDataType()
+         * @see ResolvedSchema#toPhysicalRowDataType()
          */
         DataStructureConverter createDataStructureConverter(DataType 
producedDataType);
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index e5138e7..de688a8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -66,6 +66,7 @@ import 
org.apache.flink.table.filesystem.stream.compact.CompactBulkReader;
 import org.apache.flink.table.filesystem.stream.compact.CompactReader;
 import 
org.apache.flink.table.filesystem.stream.compact.FileInputFormatCompactReader;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
@@ -314,6 +315,11 @@ public class FileSystemTableSink extends 
AbstractFileSystemTable
             }
 
             @Override
+            public <T> TypeInformation<T> createTypeInformation(LogicalType 
producedLogicalType) {
+                return context.createTypeInformation(producedLogicalType);
+            }
+
+            @Override
             public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
                     DataType producedDataType) {
                 throw new TableException("Compaction reader not support 
DataStructure converter.");
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
index f6d89dc..f256b14 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.validateOutputDataType;
 
@@ -49,6 +50,11 @@ public final class SinkRuntimeProviderContext implements 
DynamicTableSink.Contex
     }
 
     @Override
+    public TypeInformation<?> createTypeInformation(LogicalType 
consumedLogicalType) {
+        return InternalTypeInfo.of(consumedLogicalType);
+    }
+
+    @Override
     public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(
             DataType consumedDataType) {
         validateOutputDataType(consumedDataType);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
index 7b34e95..754427c 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
 
@@ -50,6 +51,11 @@ public final class LookupRuntimeProviderContext implements 
LookupTableSource.Loo
     }
 
     @Override
+    public TypeInformation<?> createTypeInformation(LogicalType 
producedLogicalType) {
+        return InternalTypeInfo.of(producedLogicalType);
+    }
+
+    @Override
     public DataStructureConverter createDataStructureConverter(DataType 
producedDataType) {
         validateInputDataType(producedDataType);
         return new DataStructureConverterWrapper(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
index dfd2823..0488d4d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import static 
org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
 
@@ -41,6 +42,11 @@ public final class ScanRuntimeProviderContext implements 
ScanTableSource.ScanCon
     }
 
     @Override
+    public TypeInformation<?> createTypeInformation(LogicalType 
producedLogicalType) {
+        return InternalTypeInfo.of(producedLogicalType);
+    }
+
+    @Override
     public DataStructureConverter createDataStructureConverter(DataType 
producedDataType) {
         validateInputDataType(producedDataType);
         return new DataStructureConverterWrapper(

Reply via email to