dawidwys commented on a change in pull request #12887:
URL: https://github.com/apache/flink/pull/12887#discussion_r453759498
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -217,29 +221,52 @@ public DataType visit(CollectionDataType
collectionDataType) {
@Override
public DataType visit(FieldsDataType fieldsDataType) {
- final List<DataType> newFields =
fieldsDataType.getChildren().stream()
+ final List<DataType> newDataTypes =
fieldsDataType.getChildren().stream()
.map(dt -> dt.accept(this))
.collect(Collectors.toList());
final LogicalType logicalType =
fieldsDataType.getLogicalType();
final LogicalType newLogicalType;
if (logicalType instanceof RowType) {
- final List<RowType.RowField> oldFields =
((RowType) logicalType).getFields();
- final List<RowType.RowField> newRowFields =
IntStream.range(0, oldFields.size())
+ final List<RowField> oldFields = ((RowType)
logicalType).getFields();
+ final List<RowField> newFields =
IntStream.range(0, oldFields.size())
.mapToObj(i ->
- new RowType.RowField(
+ new RowField(
oldFields.get(i).getName(),
-
newFields.get(i).getLogicalType(),
+
newDataTypes.get(i).getLogicalType(),
oldFields.get(i).getDescription().orElse(null)))
.collect(Collectors.toList());
newLogicalType = new RowType(
logicalType.isNullable(),
- newRowFields);
+ newFields);
+ } else if (logicalType instanceof StructuredType) {
Review comment:
nit and unrelated comment: I find the logic of `DataTypeTransformer` not
the most straightforward. Not sure why it applies the transformation
transitively. It probably is needed for the places where we use it, but it is
not obvious from the method/class itself.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]