fsk119 commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r514887936



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -109,73 +112,67 @@ public void onMatch(RelOptRuleCall call) {
                        usedFields = refFields;
                }
                // if no fields can be projected, we keep the original plan.
-               if (usedFields.length == fieldCount) {
+               if (!supportsNestedProjection && usedFields.length == 
fieldCount) {
                        return;
                }
 
-               final List<String> projectedFieldNames = 
IntStream.of(usedFields)
-                       .mapToObj(fieldNames::get)
-                       .collect(Collectors.toList());
-
                final TableSchema oldSchema = 
oldTableSourceTable.catalogTable().getSchema();
                final DynamicTableSource oldSource = 
oldTableSourceTable.tableSource();
                final List<String> metadataKeys = 
DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource);
                final int physicalFieldCount = fieldCount - metadataKeys.size();
                final DynamicTableSource newSource = oldSource.copy();
 
-               // remove metadata columns from the projection push down and 
store it in a separate list
-               // the projection push down itself happens purely on physical 
columns
-               final int[] usedPhysicalFields;
-               final List<String> usedMetadataKeys;
-               if (newSource instanceof SupportsReadingMetadata) {
-                       usedPhysicalFields = IntStream.of(usedFields)
-                               // select only physical columns
-                               .filter(i -> i < physicalFieldCount)
-                               .toArray();
-                       final List<String> usedMetadataKeysUnordered = 
IntStream.of(usedFields)
-                               // select only metadata columns
-                               .filter(i -> i >= physicalFieldCount)
-                               // map the indices to keys
-                               .mapToObj(i -> metadataKeys.get(fieldCount - i 
- 1))
-                               .collect(Collectors.toList());
-                       // order the keys according to the source's declaration
-                       usedMetadataKeys = metadataKeys
-                               .stream()
-                               .filter(usedMetadataKeysUnordered::contains)
-                               .collect(Collectors.toList());
+               final List<List<Integer>> usedFieldsCoordinates = new 
ArrayList<>();
+               final Map<Integer, Map<List<String>, Integer>> 
fieldCoordinatesToOrder = new HashMap<>();
+
+               if (supportsNestedProjection) {
+                       
getCoordinatesAndMappingOfPhysicalColumnWithNestedProjection(
+                                       project, oldSchema, usedFields, 
physicalFieldCount, usedFieldsCoordinates, fieldCoordinatesToOrder);
                } else {
-                       usedPhysicalFields = usedFields;
-                       usedMetadataKeys = Collections.emptyList();
+                       for (int usedField : usedFields) {
+                               // filter metadata columns

Review comment:
       add TODO.
   Here we only project the top level of the fields.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to