slinkydeveloper commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r745401218
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -101,54 +109,123 @@ public FileSystemTableSource(
this.bulkReaderFormat = bulkReaderFormat;
this.deserializationFormat = deserializationFormat;
this.formatFactory = formatFactory;
+
+ this.producedDataType = context.getPhysicalRowDataType();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ // When this table has no partition, just return a empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
- // When this table has no partition, just return a empty source.
return InputFormatProvider.of(new CollectionInputFormat<>(new
ArrayList<>(), null));
- } else if (bulkReaderFormat != null) {
- if (bulkReaderFormat instanceof BulkDecodingFormat
- && filters != null
- && filters.size() > 0) {
- ((BulkDecodingFormat<RowData>)
bulkReaderFormat).applyFilters(filters);
+ }
+
+ // Physical type is computed from the full data type, filtering out
partition and
+ // metadata columns. This type is going to be used by formats to parse
the input.
+ List<DataTypes.Field> producedDataTypeFields =
DataType.getFields(producedDataType);
+ if (metadataKeys != null) {
+ // If metadata keys are present, then by SupportsReadingMetadata
contract all the
+ // metadata columns will be at the end of the producedDataType, so
we can just remove
+ // from the list the last metadataKeys.size() fields.
+ producedDataTypeFields =
+ producedDataTypeFields.subList(
+ 0, producedDataTypeFields.size() -
metadataKeys.size());
+ }
+ DataType physicalDataType =
+ producedDataTypeFields.stream()
Review comment:
There you go https://issues.apache.org/jira/browse/FLINK-24843
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]