twalthr commented on a change in pull request #15588:
URL: https://github.com/apache/flink/pull/15588#discussion_r614864282
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
##########
@@ -75,6 +80,49 @@
@Internal
public final class DynamicSinkUtils {
+ /** Converts an {@link TableResult#collect()} sink to a {@link RelNode}. */
+ public static RelNode convertCollectToRel(
+ FlinkRelBuilder relBuilder,
+ RelNode input,
+ CollectModifyOperation collectModifyOperation) {
+ final DataTypeFactory dataTypeFactory =
+
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+ final ResolvedSchema childSchema =
collectModifyOperation.getChild().getResolvedSchema();
+ final ResolvedSchema schema =
+ ResolvedSchema.physical(
+ childSchema.getColumnNames(),
childSchema.getColumnDataTypes());
+ final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
+ final ResolvedCatalogTable catalogTable = new
ResolvedCatalogTable(unresolvedTable, schema);
+
+ final DataType consumedDataType = fixCollectDataType(dataTypeFactory,
schema);
+
+ final CollectDynamicSink tableSink =
+ new CollectDynamicSink(
+ collectModifyOperation.getTableIdentifier(),
consumedDataType);
+
collectModifyOperation.setSelectResultProvider(tableSink.getSelectResultProvider());
+ return convertSinkToRel(
+ relBuilder,
+ input,
+ collectModifyOperation.getTableIdentifier(),
+ Collections.emptyMap(),
+ false,
+ tableSink,
+ catalogTable);
+ }
+
+ /** Temporary solution until we drop legacy types. */
+ private static DataType fixCollectDataType(
+ DataTypeFactory dataTypeFactory, ResolvedSchema schema) {
+ final DataType fixedDataType =
+ DataTypeUtils.transform(
+ dataTypeFactory,
+ schema.toSourceRowDataType(),
+ TypeTransformations.legacyRawToTypeInfoRaw(),
+ TypeTransformations.legacyToNonLegacy());
+ // TODO erase the conversion class earlier when dropping legacy code
Review comment:
I created FLINK-22321
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]