[
https://issues.apache.org/jira/browse/FLINK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686359#comment-17686359
]
Mate Czagany commented on FLINK-30873:
--------------------------------------
I've made some further investigation into this:
* Assigning the partition columns to the result rows is done by
`FileInfoExtractorBulkFormat` and if you set PARTITIONED BY clause it will
always try to retrieve the partitioned columns using
`PartitionPathUtils#extractPartitionSpecFromPath`. So extracting the partition
values from the splits should not be affected by changing the paths we
initialize the `FileSource` with. It's only affected by the `BulkFormat` we
create `FileSource` with would not be changed with my proposed solution.
* `FileEnumerator` would only be called with the single root path instead of
all the initial partition paths found, you can see the implementation for
enumerating the paths in `NonSplittingRecursiveEnumerator#enumerateSplits`. It
always recursively enumerates through all the paths given and all its sub
directories, so in an ideal environment this should not be an issue either, but
it will result in more I/O operations.
* `FileSystemTableSource` implements `SupportsPartitionPushDown` which can
restrict the number of partitions that will be read during planning phase based
on the filters of the query (see `PushPartitionIntoTableSourceScanRule` for
more info). This can cause issues as this restricts the partitions the
connector can read and in case of a dynamic filter (e.g. `partitionField > 20`)
we won't be able to respect this optimization in case there are new partition
folders.
One (pretty bad) solution I could think of:
Add a new property, e.g. `source.monitor-partitions` which if true, the
`FileSystemTableSource` will not scan the root folder for partitions, but it
will pass the root folder to `FileSource` which will automatically iterate
through its subfolders and parse the partitions. If this property is true,
throw an exception if the partitions get pruned by the planner (if the user
specifies any filters for the partitions).
But it would be easier for the user to create a new table without partitions
like you did, since it will have the exact same functionality as this solution.
The only advantage of this solution might be that the user does not have to
create two seperate table if they write and read to/from it as well.
Best solution in my opinion would be to explain this in the documentation.
> FileSystem Table API connector doesn't discover new files when partitions are
> specified
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-30873
> URL: https://issues.apache.org/jira/browse/FLINK-30873
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.16.1
> Reporter: Yaroslav Tkachenko
> Priority: Major
>
> When *source.monitor-interval* is configured I expect the source connector to
> discover new files. However, if the source table was created with partitions
> (using PARTITIONED BY), it only discovers new files in the partition folders
> that existed during the startup; it doesn't discover new partitions.
> I believe the problem is
> [here|https://github.com/apache/flink/blob/5f2d088a2713ced5c6ce072db92f4378f73bc739/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L276-L286],
> only these file paths are monitored internally, no partition discovery is
> currently implemented.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)