twalthr commented on a change in pull request #17544:
URL: https://github.com/apache/flink/pull/17544#discussion_r745388320



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -314,20 +394,86 @@ public String asSummaryString() {
         return map;
     }
 
-    private int[] readFields() {
-        return projectedFields == null
-                ? IntStream.range(0, 
DataType.getFieldCount(getPhysicalDataType())).toArray()
-                : Arrays.stream(projectedFields).mapToInt(array -> 
array[0]).toArray();
+    // 
--------------------------------------------------------------------------------------------
+    // Methods to apply projections and metadata,
+    // will influence the final output and physical type used by formats
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
+        this.projectFields = projectedFields;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        if (!metadataKeys.isEmpty()) {

Review comment:
       this could be problematic. the optimizer might call this method twice. 
once with the declared metadata columns and once with the actual used ones 
(maybe empty). It should be safe to remove this if branch?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -101,54 +109,123 @@ public FileSystemTableSource(
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
+
+        this.producedDataType = context.getPhysicalRowDataType();
     }
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        // When this table has no partition, just return a empty source.
         if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-            // When this table has no partition, just return a empty source.
             return InputFormatProvider.of(new CollectionInputFormat<>(new 
ArrayList<>(), null));
-        } else if (bulkReaderFormat != null) {
-            if (bulkReaderFormat instanceof BulkDecodingFormat
-                    && filters != null
-                    && filters.size() > 0) {
-                ((BulkDecodingFormat<RowData>) 
bulkReaderFormat).applyFilters(filters);
+        }
+
+        // Physical type is computed from the full data type, filtering out 
partition and
+        // metadata columns. This type is going to be used by formats to parse 
the input.
+        List<DataTypes.Field> producedDataTypeFields = 
DataType.getFields(producedDataType);
+        if (metadataKeys != null) {
+            // If metadata keys are present, then by SupportsReadingMetadata 
contract all the
+            // metadata columns will be at the end of the producedDataType, so 
we can just remove
+            // from the list the last metadataKeys.size() fields.
+            producedDataTypeFields =
+                    producedDataTypeFields.subList(
+                            0, producedDataTypeFields.size() - 
metadataKeys.size());
+        }
+        DataType physicalDataType =
+                producedDataTypeFields.stream()

Review comment:
       do we have an issue already for introducing `Context.getPartitionKeys: 
int[]` and use `DataType.excludeFields`?

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -314,20 +394,86 @@ public String asSummaryString() {
         return map;
     }
 
-    private int[] readFields() {
-        return projectedFields == null
-                ? IntStream.range(0, 
DataType.getFieldCount(getPhysicalDataType())).toArray()
-                : Arrays.stream(projectedFields).mapToInt(array -> 
array[0]).toArray();
+    // 
--------------------------------------------------------------------------------------------
+    // Methods to apply projections and metadata,
+    // will influence the final output and physical type used by formats
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
+        this.projectFields = projectedFields;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        if (!metadataKeys.isEmpty()) {
+            this.metadataKeys = metadataKeys;
+            this.producedDataType = producedDataType;
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Arrays.stream(ReadableFileInfo.values())
+                .collect(Collectors.toMap(ReadableFileInfo::getKey, 
ReadableFileInfo::getDataType));
+    }
+
+    interface FileInfoAccessor extends Serializable {
+        /**
+         * Access the information from the {@link 
org.apache.flink.core.fs.FileInputSplit}. The
+         * return value type must be an internal type.
+         */
+        Object getValue(FileSourceSplit split);
     }
 
-    private DataType getProjectedDataType() {
-        final DataType physicalDataType = super.getPhysicalDataType();
+    enum ReadableFileInfo implements Serializable {
+        FILEPATH(
+                "filepath",
+                DataTypes.STRING().notNull(),
+                new FileInfoAccessor() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object getValue(FileSourceSplit split) {
+                        return StringData.fromString(split.path().getPath());
+                    }
+                });
+
+        final String key;
+        final DataType dataType;
+        final FileInfoAccessor converter;
+
+        ReadableFileInfo(String key, DataType dataType, FileInfoAccessor 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
 
-        // If we haven't projected fields, we just return the original 
physical data type,
-        // otherwise we need to compute the physical data type depending on 
the projected fields.
-        if (projectedFields == null) {
-            return physicalDataType;
+        public String getKey() {
+            return key;
+        }
+
+        public DataType getDataType() {
+            return dataType;
+        }
+
+        public FileInfoAccessor getAccessor() {
+            return converter;
+        }
+
+        public static ReadableFileInfo resolve(String key) {
+            switch (key) {
+                case "filepath":

Review comment:
       why having the constant two times? we can simply iterate through 
ReadableFileInfo values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to