yuchuanchen commented on code in PR #21563:
URL: https://github.com/apache/flink/pull/21563#discussion_r1066838659


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java:
##########
@@ -94,6 +99,217 @@ public static DataType projectRow(DataType dataType, int[] 
indexPaths) {
         return Projection.of(indexPaths).project(dataType);
     }
 
+    public static List<RowType.RowField> buildRowFields(RowType allType, 
int[][] projectedFields) {
+        final List<RowType.RowField> updatedFields = new ArrayList<>();
+        for (int[] projectedField : projectedFields) {
+            updatedFields.add(
+                    buildRowFieldInProjectFields(
+                            allType.getFields().get(projectedField[0]), 
projectedField, 0));
+        }
+        return updatedFields;
+    }
+
+    public static RowType buildRow(RowType allType, int[][] projectedFields) {
+        return new RowType(
+                allType.isNullable(), mergeRowFields(buildRowFields(allType, 
projectedFields)));
+    }
+
+    public static RowType buildRow(RowType allType, List<RowType.RowField> 
updatedFields) {
+        return new RowType(allType.isNullable(), 
mergeRowFields(updatedFields));
+    }
+
+    public static RowType.RowField buildRowFieldInProjectFields(
+            RowType.RowField rowField, int[] fields, int index) {
+        if (fields.length == 1 || index == fields.length - 1) {
+            LogicalType rowType = rowField.getType();
+            if (rowType.is(ROW)) {
+                rowType = new RowType(rowType.isNullable(), ((RowType) 
rowType).getFields());
+            }
+            return new RowField(rowField.getName(), rowType);
+        }
+        Preconditions.checkArgument(rowField.getType().is(ROW), "Row data type 
expected.");
+        final List<RowType.RowField> updatedFields = new ArrayList<>();
+        RowType rowtype = ((RowType) rowField.getType());
+        updatedFields.add(
+                buildRowFieldInProjectFields(
+                        rowtype.getFields().get(fields[index + 1]), fields, 
index + 1));
+        return new RowType.RowField(
+                rowField.getName(), new RowType(rowtype.isNullable(), 
updatedFields));
+    }
+
+    public static List<RowType.RowField> mergeRowFields(List<RowType.RowField> 
updatedFields) {

Review Comment:
   updated



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +70,22 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy 
operation in some cases,"
                                     + "this config allows users to choose the 
statistics type according to different situations.");
 
+    public static final ConfigOption<List<String>>
+            SOURCE_NESTED_PROJECTION_PUSHDOWN_SUPPORTED_FORMATS =
+                    
ConfigOptions.key("source.nested.projection.pushdown.supported.formats")

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to