slinkydeveloper commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r743812037
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -340,64 +406,27 @@ public String asSummaryString() {
return map;
}
- private int[] readFields() {
- if (this.producedDataType != null) {
- return IntStream.range(
- 0,
- (int)
-
DataType.getFields(this.producedDataType).stream()
- .filter(
- field ->
-
!usedMetadataKeys.contains(
-
field.getName()))
- .count())
- .toArray();
- }
- return projectedFields == null
- ? IntStream.range(0,
DataType.getFieldCount(getPhysicalDataType())).toArray()
- : Arrays.stream(projectedFields).mapToInt(array ->
array[0]).toArray();
- }
+ //
--------------------------------------------------------------------------------------------
+ // Methods to apply projections and metadata,
+ // will influence the final output and physical type used by formats
+ //
--------------------------------------------------------------------------------------------
@Override
- DataType getPhysicalDataType() {
- if (this.usedMetadataKeys != null) {
- return DataTypes.ROW(
- DataType.getFields(super.getPhysicalDataType()).stream()
- .filter(field ->
!usedMetadataKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new));
- }
- return super.getPhysicalDataType();
- }
-
- private DataType getProjectedDataType() {
- final DataType physicalDataType =
- this.producedDataType != null
- ? DataTypes.ROW(
-
DataType.getFields(this.producedDataType).stream()
- .filter(
- field ->
-
!usedMetadataKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new))
- : super.getPhysicalDataType();
-
- // If we haven't projected fields, we just return the original
physical data type,
- // otherwise we need to compute the physical data type depending on
the projected fields.
- if (projectedFields == null) {
- return physicalDataType;
- }
- return DataType.projectFields(physicalDataType, projectedFields);
+ public void applyProjection(int[][] projectedFields) {
+ this.projectFields = projectedFields;
}
@Override
- DataType getPhysicalDataTypeWithoutPartitionColumns() {
- if (this.producedDataType != null) {
- return DataTypes.ROW(
- DataType.getFields(this.producedDataType).stream()
- .filter(field ->
!usedMetadataKeys.contains(field.getName()))
- .filter(field ->
!partitionKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new));
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ if (!metadataKeys.isEmpty()) {
+ this.metadataKeys = metadataKeys;
+ this.fullOutputDataType = producedDataType;
+ // If a projection was pushed down, we need to remove it here
because producedDataType
+ // is already projected
+ if (projectFields != null) {
Review comment:
I've updated this code, now it's even better with the new
`applyProjections` interface
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]