This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 946a41c2fe8016b39c9737b1f926bed0d7a23e95 Author: slinkydeveloper <[email protected]> AuthorDate: Wed Sep 29 17:48:02 2021 +0200 [FLINK-24399][table-common] Add DynamicTableSource.Context#createTypeInformation(LogicalType) and DynamicTableSink.Context#createTypeInformation(LogicalType) Signed-off-by: slinkydeveloper <[email protected]> --- .../table/Elasticsearch6DynamicSinkITCase.java | 6 ++++++ .../elasticsearch/table/Elasticsearch6DynamicSinkTest.java | 6 ++++++ .../table/Elasticsearch7DynamicSinkITCase.java | 6 ++++++ .../elasticsearch/table/Elasticsearch7DynamicSinkTest.java | 6 ++++++ .../flink/table/connector/sink/DynamicTableSink.java | 6 ++++++ .../flink/table/connector/source/DynamicTableSource.java | 14 ++++++++++---- .../apache/flink/table/filesystem/FileSystemTableSink.java | 6 ++++++ .../runtime/connector/sink/SinkRuntimeProviderContext.java | 6 ++++++ .../connector/source/LookupRuntimeProviderContext.java | 6 ++++++ .../connector/source/ScanRuntimeProviderContext.java | 6 ++++++ 10 files changed, 64 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java index 77449d5..0ebc52b 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; @@ -385,6 +386,11 @@ public class Elasticsearch6DynamicSinkITCase extends TestLogger { } @Override + public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) { + return null; + } + + @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter( DataType consumedDataType) { return null; diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java index 6ad4f78..d1af3ec 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.TestLogger; import org.apache.http.HttpHost; @@ -257,6 +258,11 @@ public class Elasticsearch6DynamicSinkTest extends TestLogger { } @Override + public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) { + return null; + } + + @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter( DataType consumedDataType) { return null; diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java index a74496c..fdf7a3a 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; @@ -362,6 +363,11 @@ public class Elasticsearch7DynamicSinkITCase extends TestLogger { } @Override + public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) { + return null; + } + + @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter( DataType consumedDataType) { return null; diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java index e8854b2..6ab6828 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.TestLogger; import org.apache.http.HttpHost; @@ -257,6 +258,11 @@ public class Elasticsearch7DynamicSinkTest extends TestLogger { } @Override + public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) { + return null; + } + + @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter( DataType consumedDataType) { return null; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java index d1ff721..dd84b71 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java @@ -150,6 +150,12 @@ public interface DynamicTableSink { <T> TypeInformation<T> createTypeInformation(DataType consumedDataType); /** + * Creates type information describing the internal data structures of the given {@link + * LogicalType}. + */ + <T> TypeInformation<T> createTypeInformation(LogicalType consumedLogicalType); + + /** * Creates a converter for mapping between Flink's internal data structures and objects * specified by the given {@link DataType} that can be passed into a runtime implementation. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java index f90f5dd..9180dba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java @@ -20,7 +20,7 @@ package org.apache.flink.table.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; @@ -52,7 +52,7 @@ import java.io.Serializable; * <p>Note: Both interfaces can be implemented at the same time. The planner decides about their * usage depending on the specified query. * - * <p>Instances of the above mentioned interfaces can be seen as factories that eventually produce + * <p>Instances of the above-mentioned interfaces can be seen as factories that eventually produce * concrete runtime implementation for reading the actual data. * * <p>Depending on the optionally declared abilities such as {@link SupportsProjectionPushDown} or @@ -94,11 +94,17 @@ public interface DynamicTableSource { * Creates type information describing the internal data structures of the given {@link * DataType}. * - * @see TableSchema#toPhysicalRowDataType() + * @see ResolvedSchema#toPhysicalRowDataType() */ <T> TypeInformation<T> createTypeInformation(DataType producedDataType); /** + * Creates type information describing the internal data structures of the given {@link + * LogicalType}. + */ + <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType); + + /** * Creates a converter for mapping between objects specified by the given {@link DataType} * and Flink's internal data structures that can be passed into a runtime implementation. * @@ -107,7 +113,7 @@ public interface DynamicTableSource { * types. * * @see LogicalType#supportsInputConversion(Class) - * @see TableSchema#toPhysicalRowDataType() + * @see ResolvedSchema#toPhysicalRowDataType() */ DataStructureConverter createDataStructureConverter(DataType producedDataType); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index e5138e7..de688a8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -66,6 +66,7 @@ import org.apache.flink.table.filesystem.stream.compact.CompactBulkReader; import org.apache.flink.table.filesystem.stream.compact.CompactReader; import org.apache.flink.table.filesystem.stream.compact.FileInputFormatCompactReader; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; @@ -314,6 +315,11 @@ public class FileSystemTableSink extends AbstractFileSystemTable } @Override + public <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType) { + return context.createTypeInformation(producedLogicalType); + } + + @Override public DynamicTableSource.DataStructureConverter createDataStructureConverter( DataType producedDataType) { throw new TableException("Compaction reader not support DataStructure converter."); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java index f6d89dc..f256b14 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java @@ -24,6 +24,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import static org.apache.flink.table.types.utils.DataTypeUtils.validateOutputDataType; @@ -49,6 +50,11 @@ public final class SinkRuntimeProviderContext implements DynamicTableSink.Contex } @Override + public TypeInformation<?> createTypeInformation(LogicalType consumedLogicalType) { + return InternalTypeInfo.of(consumedLogicalType); + } + + @Override public DynamicTableSink.DataStructureConverter createDataStructureConverter( DataType consumedDataType) { validateOutputDataType(consumedDataType); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java index 7b34e95..754427c 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java @@ -25,6 +25,7 @@ import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType; @@ -50,6 +51,11 @@ public final class LookupRuntimeProviderContext implements LookupTableSource.Loo } @Override + public TypeInformation<?> createTypeInformation(LogicalType producedLogicalType) { + return InternalTypeInfo.of(producedLogicalType); + } + + @Override public DataStructureConverter createDataStructureConverter(DataType producedDataType) { validateInputDataType(producedDataType); return new DataStructureConverterWrapper( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java index dfd2823..0488d4d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java @@ -25,6 +25,7 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType; @@ -41,6 +42,11 @@ public final class ScanRuntimeProviderContext implements ScanTableSource.ScanCon } @Override + public TypeInformation<?> createTypeInformation(LogicalType producedLogicalType) { + return InternalTypeInfo.of(producedLogicalType); + } + + @Override public DataStructureConverter createDataStructureConverter(DataType producedDataType) { validateInputDataType(producedDataType); return new DataStructureConverterWrapper(
