twalthr commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r738492595
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -174,13 +174,13 @@ object FlinkBatchRuleSets {
* RuleSet to do push predicate/partition into table scan
*/
val FILTER_TABLESCAN_PUSHDOWN_RULES: RuleSet = RuleSets.ofList(
- // push a filter down into the table scan
- PushFilterIntoTableSourceScanRule.INSTANCE,
- PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the table scan
PushPartitionIntoLegacyTableSourceScanRule.INSTANCE,
// push partition into the dynamic table scan
- PushPartitionIntoTableSourceScanRule.INSTANCE
+ PushPartitionIntoTableSourceScanRule.INSTANCE,
Review comment:
please convert this commit in a hotfix commit before the actual commits
for FLINK-24165
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -281,32 +282,46 @@ private RowDataPartitionComputer partitionComputer() {
}
private Optional<CompactReader.Factory<RowData>>
createCompactReaderFactory(Context context) {
- if (bulkReaderFormat != null) {
- final BulkFormat<RowData, FileSourceSplit> format =
- bulkReaderFormat.createRuntimeDecoder(
- createSourceContext(context),
getPhysicalDataType());
- return Optional.of(CompactBulkReader.factory(format));
- } else if (formatFactory != null) {
+ // TODO FLINK-19845 old format factory, to be removed soon.
+ if (formatFactory != null) {
final InputFormat<RowData, ?> format =
formatFactory.createReader(createReaderContext());
if (format instanceof FileInputFormat) {
//noinspection unchecked
return Optional.of(
FileInputFormatCompactReader.factory((FileInputFormat<RowData>) format));
}
+ return Optional.empty();
+ }
+
+ // Compute fullOutputDataType (including partition fields) and
physicalDataType (excluding
+ // partition fields)
+ final DataType fullOutputDataType = getPhysicalDataType();
Review comment:
reading this is very confusing `fullOutputDataType =
getPhysicalDataType()`? `fullOutputDataType` should be `consumedDataType` or
even `physicalRowWithPartitions` everywhere
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
##########
@@ -93,10 +129,36 @@ public boolean isSplittable() {
}
private Reader<RowData> wrapReader(Reader<RowData> superReader,
FileSourceSplit split) {
- // Fill the metadata row
- final GenericRowData metadataRowData = new
GenericRowData(metadataColumnsFunctions.size());
- for (int i = 0; i < metadataColumnsFunctions.size(); i++) {
- metadataRowData.setField(i,
metadataColumnsFunctions.get(i).getValue(split));
+ // Fill the metadata + partition columns row
+ final GenericRowData metadataRowData =
Review comment:
call this `rowWithMetadataAndPartition`, the name might be long but only
`metadataRowData` is confusing
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testFilterPushDown.out
##########
@@ -113,4 +113,4 @@
},
"shuffleMode" : "PIPELINED"
} ]
-}
+}
Review comment:
unrelated change?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -340,64 +406,27 @@ public String asSummaryString() {
return map;
}
- private int[] readFields() {
- if (this.producedDataType != null) {
- return IntStream.range(
- 0,
- (int)
-
DataType.getFields(this.producedDataType).stream()
- .filter(
- field ->
-
!usedMetadataKeys.contains(
-
field.getName()))
- .count())
- .toArray();
- }
- return projectedFields == null
- ? IntStream.range(0,
DataType.getFieldCount(getPhysicalDataType())).toArray()
- : Arrays.stream(projectedFields).mapToInt(array ->
array[0]).toArray();
- }
+ //
--------------------------------------------------------------------------------------------
+ // Methods to apply projections and metadata,
+ // will influence the final output and physical type used by formats
+ //
--------------------------------------------------------------------------------------------
@Override
- DataType getPhysicalDataType() {
- if (this.usedMetadataKeys != null) {
- return DataTypes.ROW(
- DataType.getFields(super.getPhysicalDataType()).stream()
- .filter(field ->
!usedMetadataKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new));
- }
- return super.getPhysicalDataType();
- }
-
- private DataType getProjectedDataType() {
- final DataType physicalDataType =
- this.producedDataType != null
- ? DataTypes.ROW(
-
DataType.getFields(this.producedDataType).stream()
- .filter(
- field ->
-
!usedMetadataKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new))
- : super.getPhysicalDataType();
-
- // If we haven't projected fields, we just return the original
physical data type,
- // otherwise we need to compute the physical data type depending on
the projected fields.
- if (projectedFields == null) {
- return physicalDataType;
- }
- return DataType.projectFields(physicalDataType, projectedFields);
+ public void applyProjection(int[][] projectedFields) {
+ this.projectFields = projectedFields;
}
@Override
- DataType getPhysicalDataTypeWithoutPartitionColumns() {
- if (this.producedDataType != null) {
- return DataTypes.ROW(
- DataType.getFields(this.producedDataType).stream()
- .filter(field ->
!usedMetadataKeys.contains(field.getName()))
- .filter(field ->
!partitionKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new));
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ if (!metadataKeys.isEmpty()) {
+ this.metadataKeys = metadataKeys;
+ this.fullOutputDataType = producedDataType;
+ // If a projection was pushed down, we need to remove it here
because producedDataType
+ // is already projected
+ if (projectFields != null) {
Review comment:
this looks incorrect to me. how can a metadata pushdown suddenly make a
pushed projection invalid. we are not recomputing `projectFields` from
`fullOutputDataType`, right?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -292,24 +363,19 @@ public boolean supportsNestedProjection() {
return false;
}
- @Override
- public void applyProjection(int[][] projectedFields) {
- this.projectedFields = projectedFields;
- }
-
@Override
public FileSystemTableSource copy() {
FileSystemTableSource source =
new FileSystemTableSource(
context, bulkReaderFormat, deserializationFormat,
formatFactory);
- source.projectedFields = projectedFields;
source.remainingPartitions = remainingPartitions;
source.filters = filters;
source.limit = limit;
Review comment:
remove empty line?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -217,7 +283,12 @@ public String getDefaultPartName() {
@Override
public int[] getProjectFields() {
- return readFields();
+ return projectFields == null
+ ? IntStream.range(0,
DataType.getFieldCount(getPhysicalDataType()))
Review comment:
this will include partitions, is that correct? btw should we rename
`getPhysicalDataType()`?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -83,15 +83,13 @@
@Nullable private final DecodingFormat<DeserializationSchema<RowData>>
deserializationFormat;
@Nullable private final FileSystemFormatFactory formatFactory;
- private int[][] projectedFields;
private List<Map<String, String>> remainingPartitions;
private List<ResolvedExpression> filters;
private Long limit;
Review comment:
nit: remove empty line? or is there a logical separation between the
upper and lower member field groups
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -292,24 +363,19 @@ public boolean supportsNestedProjection() {
return false;
}
- @Override
- public void applyProjection(int[][] projectedFields) {
- this.projectedFields = projectedFields;
- }
-
@Override
public FileSystemTableSource copy() {
FileSystemTableSource source =
new FileSystemTableSource(
context, bulkReaderFormat, deserializationFormat,
formatFactory);
- source.projectedFields = projectedFields;
source.remainingPartitions = remainingPartitions;
source.filters = filters;
source.limit = limit;
- source.producedDataType = producedDataType;
- source.usedMetadata = usedMetadata;
- source.usedMetadataKeys = usedMetadataKeys;
+ source.projectFields = projectFields;
+ source.metadataKeys = metadataKeys;
+ source.partitionKeys = partitionKeys;
Review comment:
nit: order the variables and variable assignment according to the
application order? first partitions, then project, then metadata?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -40,7 +40,7 @@
final ObjectIdentifier tableIdentifier;
final Configuration tableOptions;
final ResolvedSchema schema;
- final List<String> partitionKeys;
+ List<String> partitionKeys;
Review comment:
nit: separate mutable members from constant members, maybe be a empty
line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]