twalthr commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r745388320
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -314,20 +394,86 @@ public String asSummaryString() {
return map;
}
- private int[] readFields() {
- return projectedFields == null
- ? IntStream.range(0,
DataType.getFieldCount(getPhysicalDataType())).toArray()
- : Arrays.stream(projectedFields).mapToInt(array ->
array[0]).toArray();
+ //
--------------------------------------------------------------------------------------------
+ // Methods to apply projections and metadata,
+ // will influence the final output and physical type used by formats
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
+ this.projectFields = projectedFields;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ if (!metadataKeys.isEmpty()) {
Review comment:
this could be problematic. the optimizer might call this method twice.
once with the declared metadata columns and once with the actual used ones
(maybe empty). It should be safe to remove this if branch?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -101,54 +109,123 @@ public FileSystemTableSource(
this.bulkReaderFormat = bulkReaderFormat;
this.deserializationFormat = deserializationFormat;
this.formatFactory = formatFactory;
+
+ this.producedDataType = context.getPhysicalRowDataType();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ // When this table has no partition, just return a empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
- // When this table has no partition, just return a empty source.
return InputFormatProvider.of(new CollectionInputFormat<>(new
ArrayList<>(), null));
- } else if (bulkReaderFormat != null) {
- if (bulkReaderFormat instanceof BulkDecodingFormat
- && filters != null
- && filters.size() > 0) {
- ((BulkDecodingFormat<RowData>)
bulkReaderFormat).applyFilters(filters);
+ }
+
+ // Physical type is computed from the full data type, filtering out
partition and
+ // metadata columns. This type is going to be used by formats to parse
the input.
+ List<DataTypes.Field> producedDataTypeFields =
DataType.getFields(producedDataType);
+ if (metadataKeys != null) {
+ // If metadata keys are present, then by SupportsReadingMetadata
contract all the
+ // metadata columns will be at the end of the producedDataType, so
we can just remove
+ // from the list the last metadataKeys.size() fields.
+ producedDataTypeFields =
+ producedDataTypeFields.subList(
+ 0, producedDataTypeFields.size() -
metadataKeys.size());
+ }
+ DataType physicalDataType =
+ producedDataTypeFields.stream()
Review comment:
do we have an issue already for introducing `Context.getPartitionKeys:
int[]` and use `DataType.excludeFields`?
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -314,20 +394,86 @@ public String asSummaryString() {
return map;
}
- private int[] readFields() {
- return projectedFields == null
- ? IntStream.range(0,
DataType.getFieldCount(getPhysicalDataType())).toArray()
- : Arrays.stream(projectedFields).mapToInt(array ->
array[0]).toArray();
+ //
--------------------------------------------------------------------------------------------
+ // Methods to apply projections and metadata,
+ // will influence the final output and physical type used by formats
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
+ this.projectFields = projectedFields;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ if (!metadataKeys.isEmpty()) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Arrays.stream(ReadableFileInfo.values())
+ .collect(Collectors.toMap(ReadableFileInfo::getKey,
ReadableFileInfo::getDataType));
+ }
+
+ interface FileInfoAccessor extends Serializable {
+ /**
+ * Access the information from the {@link
org.apache.flink.core.fs.FileInputSplit}. The
+ * return value type must be an internal type.
+ */
+ Object getValue(FileSourceSplit split);
}
- private DataType getProjectedDataType() {
- final DataType physicalDataType = super.getPhysicalDataType();
+ enum ReadableFileInfo implements Serializable {
+ FILEPATH(
+ "filepath",
+ DataTypes.STRING().notNull(),
+ new FileInfoAccessor() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object getValue(FileSourceSplit split) {
+ return StringData.fromString(split.path().getPath());
+ }
+ });
+
+ final String key;
+ final DataType dataType;
+ final FileInfoAccessor converter;
+
+ ReadableFileInfo(String key, DataType dataType, FileInfoAccessor
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
- // If we haven't projected fields, we just return the original
physical data type,
- // otherwise we need to compute the physical data type depending on
the projected fields.
- if (projectedFields == null) {
- return physicalDataType;
+ public String getKey() {
+ return key;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public FileInfoAccessor getAccessor() {
+ return converter;
+ }
+
+ public static ReadableFileInfo resolve(String key) {
+ switch (key) {
+ case "filepath":
Review comment:
why having the constant two times? we can simply iterate through
ReadableFileInfo values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]