[ 
https://issues.apache.org/jira/browse/FLINK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686359#comment-17686359
 ] 

Mate Czagany commented on FLINK-30873:
--------------------------------------

I've made some further investigation into this:
 * Assigning the partition columns to the result rows is done by 
`FileInfoExtractorBulkFormat` and if you set PARTITIONED BY clause it will 
always try to retrieve the partitioned columns using 
`PartitionPathUtils#extractPartitionSpecFromPath`. So extracting the partition 
values from the splits should not be affected by changing the paths we 
initialize the `FileSource` with. It's only affected by the `BulkFormat` we 
create `FileSource` with would not be changed with my proposed solution.
 * `FileEnumerator` would only be called with the single root path instead of 
all the initial partition paths found, you can see the implementation for 
enumerating the paths in `NonSplittingRecursiveEnumerator#enumerateSplits`. It 
always recursively enumerates through all the paths given and all its sub 
directories, so in an ideal environment this should not be an issue either, but 
it will result in more I/O operations.
 * `FileSystemTableSource` implements `SupportsPartitionPushDown` which can 
restrict the number of partitions that will be read during planning phase based 
on the filters of the query (see `PushPartitionIntoTableSourceScanRule` for 
more info). This can cause issues as this restricts the partitions the 
connector can read and in case of a dynamic filter (e.g. `partitionField > 20`) 
we won't be able to respect this optimization in case there are new partition 
folders.


One (pretty bad) solution I could think of:
Add a new property, e.g. `source.monitor-partitions` which if true, the 
`FileSystemTableSource` will not scan the root folder for partitions, but it 
will pass the root folder to `FileSource` which will automatically iterate 
through its subfolders and parse the partitions. If this property is true, 
throw an exception if the partitions get pruned by the planner (if the user 
specifies any filters for the partitions).


But it would be easier for the user to create a new table without partitions 
like you did, since it will have the exact same functionality as this solution. 
The only advantage of this solution might be that the user does not have to 
create two seperate table if they write and read to/from it as well.

Best solution in my opinion would be to explain this in the documentation.

> FileSystem Table API connector doesn't discover new files when partitions are 
> specified
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-30873
>                 URL: https://issues.apache.org/jira/browse/FLINK-30873
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.1
>            Reporter: Yaroslav Tkachenko
>            Priority: Major
>
> When *source.monitor-interval* is configured I expect the source connector to 
> discover new files. However, if the source table was created with partitions 
> (using PARTITIONED BY), it only discovers new files in the partition folders 
> that existed during the startup; it doesn't discover new partitions. 
> I believe the problem is 
> [here|https://github.com/apache/flink/blob/5f2d088a2713ced5c6ce072db92f4378f73bc739/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L276-L286],
>  only these file paths are monitored internally, no partition discovery is 
> currently implemented. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to