godfreyhe commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r514204560
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -74,42 +76,46 @@
*
* <p>Note: Index paths allow for arbitrary deep nesting. For example,
{@code [[0, 2, 1], ...]}
* specifies to include the 2nd field of the 3rd field of the 1st field
in the top-level row.
+ * Sometimes, it may get name conflicts when extract fields from the
row field. Considering the
+ * the path is unique to extract fields, it makes sense to use the path
to the fields with
+ * delimiter `_` as the new name of the field. For example, the new
name of the field `b` in
+ * the row `a` is `a_b` rather than `b`. But it may still gets name
conflicts in some situation,
+ * such as the field `a_b` in the top level schema. In such situation,
it will use the postfix
+ * in the format '$%d' to resolve the name conflicts.
*/
public static DataType projectRow(DataType dataType, int[][]
indexPaths) {
final List<RowField> updatedFields = new ArrayList<>();
final List<DataType> updatedChildren = new ArrayList<>();
+ Set<String> nameDomain = new HashSet<>();
+ int duplicateCount = 0;
for (int[] indexPath : indexPaths) {
-
updatedFields.add(selectChild(dataType.getLogicalType(), indexPath, 0));
- updatedChildren.add(selectChild(dataType, indexPath,
0));
+ DataType fieldType =
dataType.getChildren().get(indexPath[0]);
+ LogicalType fieldLogicalType =
fieldType.getLogicalType();
+ StringBuilder builder =
+ new StringBuilder(((RowType)
dataType.getLogicalType()).getFieldNames().get(indexPath[0]));
+ for (int index = 1; index < indexPath.length; index++) {
+ Preconditions.checkArgument(
+ hasRoot(fieldLogicalType,
LogicalTypeRoot.ROW),
+ "Row data type expected.");
+ RowType rowtype = ((RowType) fieldLogicalType);
+
builder.append("_").append(rowtype.getFieldNames().get(indexPath[index]));
+ fieldLogicalType =
rowtype.getFields().get(indexPath[index]).getType();
+ fieldType =
fieldType.getChildren().get(indexPath[index]);
+ }
+ String path = builder.toString();
+ while (nameDomain.contains(path)) {
+ path =
builder.append("$").append(duplicateCount++).toString();
Review comment:
nit: how about adding `_` before "$"
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -109,73 +112,67 @@ public void onMatch(RelOptRuleCall call) {
usedFields = refFields;
}
// if no fields can be projected, we keep the original plan.
- if (usedFields.length == fieldCount) {
+ if (!supportsNestedProjection && usedFields.length ==
fieldCount) {
return;
}
- final List<String> projectedFieldNames =
IntStream.of(usedFields)
- .mapToObj(fieldNames::get)
- .collect(Collectors.toList());
-
final TableSchema oldSchema =
oldTableSourceTable.catalogTable().getSchema();
final DynamicTableSource oldSource =
oldTableSourceTable.tableSource();
final List<String> metadataKeys =
DynamicSourceUtils.createRequiredMetadataKeys(oldSchema, oldSource);
final int physicalFieldCount = fieldCount - metadataKeys.size();
final DynamicTableSource newSource = oldSource.copy();
- // remove metadata columns from the projection push down and
store it in a separate list
- // the projection push down itself happens purely on physical
columns
- final int[] usedPhysicalFields;
- final List<String> usedMetadataKeys;
- if (newSource instanceof SupportsReadingMetadata) {
- usedPhysicalFields = IntStream.of(usedFields)
- // select only physical columns
- .filter(i -> i < physicalFieldCount)
- .toArray();
- final List<String> usedMetadataKeysUnordered =
IntStream.of(usedFields)
- // select only metadata columns
- .filter(i -> i >= physicalFieldCount)
- // map the indices to keys
- .mapToObj(i -> metadataKeys.get(fieldCount - i
- 1))
- .collect(Collectors.toList());
- // order the keys according to the source's declaration
- usedMetadataKeys = metadataKeys
- .stream()
- .filter(usedMetadataKeysUnordered::contains)
- .collect(Collectors.toList());
+ final List<List<Integer>> usedFieldsCoordinates = new
ArrayList<>();
+ final Map<Integer, Map<List<String>, Integer>>
fieldCoordinatesToOrder = new HashMap<>();
+
+ if (supportsNestedProjection) {
+
getCoordinatesAndMappingOfPhysicalColumnWithNestedProjection(
+ project, oldSchema, usedFields,
physicalFieldCount, usedFieldsCoordinates, fieldCoordinatesToOrder);
} else {
- usedPhysicalFields = usedFields;
- usedMetadataKeys = Collections.emptyList();
+ for (int usedField : usedFields) {
+ // filter metadata columns
Review comment:
what if the metadata columns have nested fields ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
##########
@@ -66,6 +66,26 @@ class TableSourceITCase extends BatchTestBase {
| 'bounded' = 'true'
|)
|""".stripMargin)
+ val nestedTableDataId =
TestValuesTableFactory.registerData(TestData.deepNestedRow)
Review comment:
remove unused imports
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
}
}
- private void applyUpdatedMetadata(
- DynamicTableSource oldSource,
- TableSchema oldSchema,
+ private DataType applyUpdateMetadataAndGetNewDataType(
DynamicTableSource newSource,
+ DataType producedDataType,
List<String> metadataKeys,
- List<String> usedMetadataKeys,
+ int[] usedFields,
int physicalFieldCount,
- int[][] projectedPhysicalFields) {
- if (newSource instanceof SupportsReadingMetadata) {
- final DataType producedDataType =
TypeConversions.fromLogicalToDataType(
-
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+ List<List<Integer>> usedFieldsCoordinates,
+ Map<Integer, Map<List<String>, Integer>>
fieldCoordinatesToOrder) {
+ final List<String> usedMetadataKeysUnordered =
IntStream.of(usedFields)
+ // select only metadata columns
+ .filter(i -> i >= physicalFieldCount)
+ // map the indices to keys
+ .mapToObj(i -> metadataKeys.get(i -
physicalFieldCount))
+ .collect(Collectors.toList());
+ // order the keys according to the source's declaration
+ final List<String> usedMetadataKeys = metadataKeys
+ .stream()
+ .filter(usedMetadataKeysUnordered::contains)
+ .collect(Collectors.toList());
- final int[][] projectedMetadataFields = usedMetadataKeys
+ final List<List<Integer>> projectedMetadataFields =
usedMetadataKeys
.stream()
.map(metadataKeys::indexOf)
- .map(i -> new int[]{ physicalFieldCount + i })
- .toArray(int[][]::new);
+ .map(i -> {
+
fieldCoordinatesToOrder.put(physicalFieldCount + i,
Collections.singletonMap(Collections.singletonList("*"),
fieldCoordinatesToOrder.size()));
+ return
Collections.singletonList(physicalFieldCount + i); })
+ .collect(Collectors.toList());
+ usedFieldsCoordinates.addAll(projectedMetadataFields);
Review comment:
use `for` to make it clearer
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
}
}
- private void applyUpdatedMetadata(
- DynamicTableSource oldSource,
- TableSchema oldSchema,
+ private DataType applyUpdateMetadataAndGetNewDataType(
DynamicTableSource newSource,
+ DataType producedDataType,
List<String> metadataKeys,
- List<String> usedMetadataKeys,
+ int[] usedFields,
int physicalFieldCount,
- int[][] projectedPhysicalFields) {
- if (newSource instanceof SupportsReadingMetadata) {
- final DataType producedDataType =
TypeConversions.fromLogicalToDataType(
-
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+ List<List<Integer>> usedFieldsCoordinates,
+ Map<Integer, Map<List<String>, Integer>>
fieldCoordinatesToOrder) {
+ final List<String> usedMetadataKeysUnordered =
IntStream.of(usedFields)
+ // select only metadata columns
+ .filter(i -> i >= physicalFieldCount)
+ // map the indices to keys
+ .mapToObj(i -> metadataKeys.get(i -
physicalFieldCount))
+ .collect(Collectors.toList());
+ // order the keys according to the source's declaration
+ final List<String> usedMetadataKeys = metadataKeys
+ .stream()
+ .filter(usedMetadataKeysUnordered::contains)
+ .collect(Collectors.toList());
- final int[][] projectedMetadataFields = usedMetadataKeys
+ final List<List<Integer>> projectedMetadataFields =
usedMetadataKeys
.stream()
.map(metadataKeys::indexOf)
- .map(i -> new int[]{ physicalFieldCount + i })
- .toArray(int[][]::new);
+ .map(i -> {
+
fieldCoordinatesToOrder.put(physicalFieldCount + i,
Collections.singletonMap(Collections.singletonList("*"),
fieldCoordinatesToOrder.size()));
+ return
Collections.singletonList(physicalFieldCount + i); })
+ .collect(Collectors.toList());
+ usedFieldsCoordinates.addAll(projectedMetadataFields);
- final int[][] projectedFields = Stream
- .concat(
- Stream.of(projectedPhysicalFields),
- Stream.of(projectedMetadataFields)
- )
+ int[][] allFields = usedFieldsCoordinates
+ .stream()
+ .map(coordinates ->
coordinates.stream().mapToInt(i -> i).toArray())
.toArray(int[][]::new);
- // create a new, final data type that includes all
projections
- final DataType newProducedDataType =
DataTypeUtils.projectRow(producedDataType, projectedFields);
+ DataType newProducedDataType =
DataTypeUtils.projectRow(producedDataType, allFields);
- ((SupportsReadingMetadata)
newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType);
+ ((SupportsReadingMetadata)
newSource).applyReadableMetadata(usedMetadataKeys, newProducedDataType);
+ return newProducedDataType;
+ }
+
+ private void
getCoordinatesAndMappingOfPhysicalColumnWithNestedProjection(
Review comment:
the method name is too long, change to
`getExpandedFieldsAndOrderMapping` ? add some comments the explain the arguments
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -74,28 +79,26 @@ public boolean matches(RelOptRuleCall call) {
if (tableSourceTable == null ||
!(tableSourceTable.tableSource() instanceof SupportsProjectionPushDown)) {
return false;
}
- SupportsProjectionPushDown pushDownSource =
(SupportsProjectionPushDown) tableSourceTable.tableSource();
- if (pushDownSource.supportsNestedProjection()) {
- throw new TableException("Nested projection push down
is unsupported now. \n" +
- "Please disable nested projection
(SupportsProjectionPushDown#supportsNestedProjection returns false), " +
- "planner will push down the top-level
columns.");
- } else {
- return true;
- }
+ return
Arrays.stream(tableSourceTable.extraDigests()).noneMatch(digest ->
digest.startsWith("project=["));
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final LogicalTableScan scan = call.rel(1);
+ TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+
+ final boolean supportsNestedProjection =
+ ((SupportsProjectionPushDown)
oldTableSourceTable.tableSource()).supportsNestedProjection();
+ final boolean supportsReadingMetaData =
oldTableSourceTable.tableSource() instanceof SupportsReadingMetadata;
Review comment:
nit: It is better to close the position of the defined field to the
position in which the field is used.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -190,35 +187,82 @@ public void onMatch(RelOptRuleCall call) {
}
}
- private void applyUpdatedMetadata(
- DynamicTableSource oldSource,
- TableSchema oldSchema,
+ private DataType applyUpdateMetadataAndGetNewDataType(
DynamicTableSource newSource,
+ DataType producedDataType,
List<String> metadataKeys,
- List<String> usedMetadataKeys,
+ int[] usedFields,
int physicalFieldCount,
- int[][] projectedPhysicalFields) {
- if (newSource instanceof SupportsReadingMetadata) {
- final DataType producedDataType =
TypeConversions.fromLogicalToDataType(
-
DynamicSourceUtils.createProducedType(oldSchema, oldSource));
+ List<List<Integer>> usedFieldsCoordinates,
+ Map<Integer, Map<List<String>, Integer>>
fieldCoordinatesToOrder) {
+ final List<String> usedMetadataKeysUnordered =
IntStream.of(usedFields)
+ // select only metadata columns
+ .filter(i -> i >= physicalFieldCount)
+ // map the indices to keys
+ .mapToObj(i -> metadataKeys.get(i -
physicalFieldCount))
+ .collect(Collectors.toList());
+ // order the keys according to the source's declaration
+ final List<String> usedMetadataKeys = metadataKeys
+ .stream()
+ .filter(usedMetadataKeysUnordered::contains)
+ .collect(Collectors.toList());
- final int[][] projectedMetadataFields = usedMetadataKeys
+ final List<List<Integer>> projectedMetadataFields =
usedMetadataKeys
.stream()
.map(metadataKeys::indexOf)
- .map(i -> new int[]{ physicalFieldCount + i })
- .toArray(int[][]::new);
+ .map(i -> {
+
fieldCoordinatesToOrder.put(physicalFieldCount + i,
Collections.singletonMap(Collections.singletonList("*"),
fieldCoordinatesToOrder.size()));
+ return
Collections.singletonList(physicalFieldCount + i); })
+ .collect(Collectors.toList());
+ usedFieldsCoordinates.addAll(projectedMetadataFields);
- final int[][] projectedFields = Stream
- .concat(
- Stream.of(projectedPhysicalFields),
- Stream.of(projectedMetadataFields)
- )
+ int[][] allFields = usedFieldsCoordinates
Review comment:
allFields -> projectedFields
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
##########
@@ -79,7 +78,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val usedFields = RexNodeExtractor.extractRefInputFields(rexProgram)
val usedNestedFields =
RexNodeExtractor.extractRefNestedInputFields(rexProgram, usedFields)
- val expected = Array(Array("amount"), Array("*"))
+ val expected = Array(Array(util.Arrays.asList("amount")),
Array(util.Arrays.asList("*")))
Review comment:
nit: please delete a unused method `assertPlannerExpressionArrayEquals`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]