godfreyhe commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r514204560



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -74,42 +76,46 @@
         *
         * <p>Note: Index paths allow for arbitrary deep nesting. For example, 
{@code [[0, 2, 1], ...]}
         * specifies to include the 2nd field of the 3rd field of the 1st field 
in the top-level row.
+        * Sometimes, it may get name conflicts when extract fields from the 
row field. Considering the
+        * the path is unique to extract fields, it makes sense to use the path 
to the fields with
+        * delimiter `_` as the new name of the field. For example, the new 
name of the field `b` in
+        * the row `a` is `a_b` rather than `b`. But it may still gets name 
conflicts in some situation,
+        * such as the field `a_b` in the top level schema. In such situation, 
it will use the postfix
+        * in the format '$%d' to resolve the name conflicts.
         */
        public static DataType projectRow(DataType dataType, int[][] 
indexPaths) {
                final List<RowField> updatedFields = new ArrayList<>();
                final List<DataType> updatedChildren = new ArrayList<>();
+               Set<String> nameDomain = new HashSet<>();
+               int duplicateCount = 0;
                for (int[] indexPath : indexPaths) {
-                       
updatedFields.add(selectChild(dataType.getLogicalType(), indexPath, 0));
-                       updatedChildren.add(selectChild(dataType, indexPath, 
0));
+                       DataType fieldType = 
dataType.getChildren().get(indexPath[0]);
+                       LogicalType fieldLogicalType = 
fieldType.getLogicalType();
+                       StringBuilder builder =
+                                       new StringBuilder(((RowType) 
dataType.getLogicalType()).getFieldNames().get(indexPath[0]));
+                       for (int index = 1; index < indexPath.length; index++) {
+                               Preconditions.checkArgument(
+                                               hasRoot(fieldLogicalType, 
LogicalTypeRoot.ROW),
+                                               "Row data type expected.");
+                               RowType rowtype = ((RowType) fieldLogicalType);
+                               
builder.append("_").append(rowtype.getFieldNames().get(indexPath[index]));
+                               fieldLogicalType = 
rowtype.getFields().get(indexPath[index]).getType();
+                               fieldType = 
fieldType.getChildren().get(indexPath[index]);
+                       }
+                       String path = builder.toString();
+                       while (nameDomain.contains(path)) {
+                               path = 
builder.append("$").append(duplicateCount++).toString();

Review comment:
       nit: how about adding  `_` before "$"

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -109,73 +112,67 @@ public void onMatch(RelOptRuleCall call) {
                        usedFields = refFields;
                }
                // if no fields can be projected, we keep the original plan.
-               if (usedFields.length == fieldCount) {
+               if (!supportsNestedProjection && usedFields.length == 
fieldCount) {
                        return;
                }
 
-               final List<String> projectedFieldNames = 
IntStream.of(usedFields)
-                       .mapToObj(fieldNames::get)
-                       .collect(Collectors.toList());
-
                final TableSchema oldSchema = 
oldTableSourceTable.catalogTable().getSchema();
                final DynamicTableSource oldSource = 
oldTableSourceTable.tableSource();
                final List<String> metadataKeys = 
DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource);
                final int physicalFieldCount = fieldCount - metadataKeys.size();
                final DynamicTableSource newSource = oldSource.copy();
 
-               // remove metadata columns from the projection push down and 
store it in a separate list
-               // the projection push down itself happens purely on physical 
columns
-               final int[] usedPhysicalFields;
-               final List<String> usedMetadataKeys;
-               if (newSource instanceof SupportsReadingMetadata) {
-                       usedPhysicalFields = IntStream.of(usedFields)
-                               // select only physical columns
-                               .filter(i -> i < physicalFieldCount)
-                               .toArray();
-                       final List<String> usedMetadataKeysUnordered = 
IntStream.of(usedFields)
-                               // select only metadata columns
-                               .filter(i -> i >= physicalFieldCount)
-                               // map the indices to keys
-                               .mapToObj(i -> metadataKeys.get(fieldCount - i 
- 1))
-                               .collect(Collectors.toList());
-                       // order the keys according to the source's declaration
-                       usedMetadataKeys = metadataKeys
-                               .stream()
-                               .filter(usedMetadataKeysUnordered::contains)
-                               .collect(Collectors.toList());
+               final List<List<Integer>> usedFieldsCoordinates = new 
ArrayList<>();
+               final Map<Integer, Map<List<String>, Integer>> 
fieldCoordinatesToOrder = new HashMap<>();
+
+               if (supportsNestedProjection) {
+                       
getCoordinatesAndMappingOfPhysicalColumnWithNestedProjection(
+                                       project, oldSchema, usedFields, 
physicalFieldCount, usedFieldsCoordinates, fieldCoordinatesToOrder);
                } else {
-                       usedPhysicalFields = usedFields;
-                       usedMetadataKeys = Collections.emptyList();
+                       for (int usedField : usedFields) {
+                               // filter metadata columns

Review comment:
       what if the metadata columns have nested fields ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
##########
@@ -66,6 +66,26 @@ class TableSourceITCase extends BatchTestBase {
          |  'bounded' = 'true'
          |)
          |""".stripMargin)
+    val nestedTableDataId = 
TestValuesTableFactory.registerData(TestData.deepNestedRow)

Review comment:
       remove unused imports

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
                }
        }
 
-       private void applyUpdatedMetadata(
-                       DynamicTableSource oldSource,
-                       TableSchema oldSchema,
+       private DataType applyUpdateMetadataAndGetNewDataType(
                        DynamicTableSource newSource,
+                       DataType producedDataType,
                        List<String> metadataKeys,
-                       List<String> usedMetadataKeys,
+                       int[] usedFields,
                        int physicalFieldCount,
-                       int[][] projectedPhysicalFields) {
-               if (newSource instanceof SupportsReadingMetadata) {
-                       final DataType producedDataType = 
TypeConversions.fromLogicalToDataType(
-                               
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+                       List<List<Integer>> usedFieldsCoordinates,
+                       Map<Integer, Map<List<String>, Integer>> 
fieldCoordinatesToOrder) {
+               final List<String> usedMetadataKeysUnordered = 
IntStream.of(usedFields)
+                               // select only metadata columns
+                               .filter(i -> i >= physicalFieldCount)
+                               // map the indices to keys
+                               .mapToObj(i -> metadataKeys.get(i - 
physicalFieldCount))
+                               .collect(Collectors.toList());
+               // order the keys according to the source's declaration
+               final List<String> usedMetadataKeys = metadataKeys
+                               .stream()
+                               .filter(usedMetadataKeysUnordered::contains)
+                               .collect(Collectors.toList());
 
-                       final int[][] projectedMetadataFields = usedMetadataKeys
+               final List<List<Integer>> projectedMetadataFields = 
usedMetadataKeys
                                .stream()
                                .map(metadataKeys::indexOf)
-                               .map(i -> new int[]{ physicalFieldCount + i })
-                               .toArray(int[][]::new);
+                               .map(i -> {
+                                       
fieldCoordinatesToOrder.put(physicalFieldCount + i, 
Collections.singletonMap(Collections.singletonList("*"), 
fieldCoordinatesToOrder.size()));
+                                       return 
Collections.singletonList(physicalFieldCount + i); })
+                               .collect(Collectors.toList());
+               usedFieldsCoordinates.addAll(projectedMetadataFields);

Review comment:
       use `for` to make it clearer

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
                }
        }
 
-       private void applyUpdatedMetadata(
-                       DynamicTableSource oldSource,
-                       TableSchema oldSchema,
+       private DataType applyUpdateMetadataAndGetNewDataType(
                        DynamicTableSource newSource,
+                       DataType producedDataType,
                        List<String> metadataKeys,
-                       List<String> usedMetadataKeys,
+                       int[] usedFields,
                        int physicalFieldCount,
-                       int[][] projectedPhysicalFields) {
-               if (newSource instanceof SupportsReadingMetadata) {
-                       final DataType producedDataType = 
TypeConversions.fromLogicalToDataType(
-                               
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+                       List<List<Integer>> usedFieldsCoordinates,
+                       Map<Integer, Map<List<String>, Integer>> 
fieldCoordinatesToOrder) {
+               final List<String> usedMetadataKeysUnordered = 
IntStream.of(usedFields)
+                               // select only metadata columns
+                               .filter(i -> i >= physicalFieldCount)
+                               // map the indices to keys
+                               .mapToObj(i -> metadataKeys.get(i - 
physicalFieldCount))
+                               .collect(Collectors.toList());
+               // order the keys according to the source's declaration
+               final List<String> usedMetadataKeys = metadataKeys
+                               .stream()
+                               .filter(usedMetadataKeysUnordered::contains)
+                               .collect(Collectors.toList());
 
-                       final int[][] projectedMetadataFields = usedMetadataKeys
+               final List<List<Integer>> projectedMetadataFields = 
usedMetadataKeys
                                .stream()
                                .map(metadataKeys::indexOf)
-                               .map(i -> new int[]{ physicalFieldCount + i })
-                               .toArray(int[][]::new);
+                               .map(i -> {
+                                       
fieldCoordinatesToOrder.put(physicalFieldCount + i, 
Collections.singletonMap(Collections.singletonList("*"), 
fieldCoordinatesToOrder.size()));
+                                       return 
Collections.singletonList(physicalFieldCount + i); })
+                               .collect(Collectors.toList());
+               usedFieldsCoordinates.addAll(projectedMetadataFields);
 
-                       final int[][] projectedFields = Stream
-                               .concat(
-                                       Stream.of(projectedPhysicalFields),
-                                       Stream.of(projectedMetadataFields)
-                               )
+               int[][] allFields = usedFieldsCoordinates
+                               .stream()
+                               .map(coordinates -> 
coordinates.stream().mapToInt(i -> i).toArray())
                                .toArray(int[][]::new);
 
-                       // create a new, final data type that includes all 
projections
-                       final DataType newProducedDataType = 
DataTypeUtils.projectRow(producedDataType, projectedFields);
+               DataType newProducedDataType = 
DataTypeUtils.projectRow(producedDataType, allFields);
 
-                       ((SupportsReadingMetadata) 
newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType);
+               ((SupportsReadingMetadata) 
newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType);
+               return newProducedDataType;
+       }
+
+       private void 
getCoordinatesAndMappingOfPhysicalColumnWithNestedProjection(

Review comment:
       the method name is too long, change to 
`getExpandedFieldsAndOrderMapping` ? add some comments the explain the arguments

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -74,28 +79,26 @@ public boolean matches(RelOptRuleCall call) {
                if (tableSourceTable == null || 
!(tableSourceTable.tableSource() instanceof SupportsProjectionPushDown)) {
                        return false;
                }
-               SupportsProjectionPushDown pushDownSource = 
(SupportsProjectionPushDown) tableSourceTable.tableSource();
-               if (pushDownSource.supportsNestedProjection()) {
-                       throw new TableException("Nested projection push down 
is unsupported now. \n" +
-                                       "Please disable nested projection 
(SupportsProjectionPushDown#supportsNestedProjection returns false), " +
-                                       "planner will push down the top-level 
columns.");
-               } else {
-                       return true;
-               }
+               return 
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest -> 
digest.startsWith("project=["));
        }
 
        @Override
        public void onMatch(RelOptRuleCall call) {
                final LogicalProject project = call.rel(0);
                final LogicalTableScan scan = call.rel(1);
 
+               TableSourceTable oldTableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+
+               final boolean supportsNestedProjection =
+                               ((SupportsProjectionPushDown) 
oldTableSourceTable.tableSource()).supportsNestedProjection();
+               final boolean supportsReadingMetaData = 
oldTableSourceTable.tableSource() instanceof SupportsReadingMetadata;

Review comment:
       nit: It is better to close the position of the defined field to the 
position in which the field is used.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
                }
        }
 
-       private void applyUpdatedMetadata(
-                       DynamicTableSource oldSource,
-                       TableSchema oldSchema,
+       private DataType applyUpdateMetadataAndGetNewDataType(
                        DynamicTableSource newSource,
+                       DataType producedDataType,
                        List<String> metadataKeys,
-                       List<String> usedMetadataKeys,
+                       int[] usedFields,
                        int physicalFieldCount,
-                       int[][] projectedPhysicalFields) {
-               if (newSource instanceof SupportsReadingMetadata) {
-                       final DataType producedDataType = 
TypeConversions.fromLogicalToDataType(
-                               
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+                       List<List<Integer>> usedFieldsCoordinates,
+                       Map<Integer, Map<List<String>, Integer>> 
fieldCoordinatesToOrder) {
+               final List<String> usedMetadataKeysUnordered = 
IntStream.of(usedFields)
+                               // select only metadata columns
+                               .filter(i -> i >= physicalFieldCount)
+                               // map the indices to keys
+                               .mapToObj(i -> metadataKeys.get(i - 
physicalFieldCount))
+                               .collect(Collectors.toList());
+               // order the keys according to the source's declaration
+               final List<String> usedMetadataKeys = metadataKeys
+                               .stream()
+                               .filter(usedMetadataKeysUnordered::contains)
+                               .collect(Collectors.toList());
 
-                       final int[][] projectedMetadataFields = usedMetadataKeys
+               final List<List<Integer>> projectedMetadataFields = 
usedMetadataKeys
                                .stream()
                                .map(metadataKeys::indexOf)
-                               .map(i -> new int[]{ physicalFieldCount + i })
-                               .toArray(int[][]::new);
+                               .map(i -> {
+                                       
fieldCoordinatesToOrder.put(physicalFieldCount + i, 
Collections.singletonMap(Collections.singletonList("*"), 
fieldCoordinatesToOrder.size()));
+                                       return 
Collections.singletonList(physicalFieldCount + i); })
+                               .collect(Collectors.toList());
+               usedFieldsCoordinates.addAll(projectedMetadataFields);
 
-                       final int[][] projectedFields = Stream
-                               .concat(
-                                       Stream.of(projectedPhysicalFields),
-                                       Stream.of(projectedMetadataFields)
-                               )
+               int[][] allFields = usedFieldsCoordinates

Review comment:
       allFields -> projectedFields

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
##########
@@ -79,7 +78,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     val usedFields = RexNodeExtractor.extractRefInputFields(rexProgram)
     val usedNestedFields = 
RexNodeExtractor.extractRefNestedInputFields(rexProgram, usedFields)
 
-    val expected = Array(Array("amount"), Array("*"))
+    val expected = Array(Array(util.Arrays.asList("amount")), 
Array(util.Arrays.asList("*")))

Review comment:
       nit: please delete a unused method `assertPlannerExpressionArrayEquals`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to