[
https://issues.apache.org/jira/browse/HUDI-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen resolved HUDI-5967.
------------------------------
> Add partition ordering for full table scans
> -------------------------------------------
>
> Key: HUDI-5967
> URL: https://issues.apache.org/jira/browse/HUDI-5967
> Project: Apache Hudi
> Issue Type: Improvement
> Components: flink
> Reporter: Alex Guo
> Priority: Minor
> Labels: pull-request-available
> Fix For: 0.13.1
>
>
> I am running a streaming read query on an hourly partitioned COW table with
> the following settings and using
> [StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
> {code:java}
> config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
> config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
> config.set(FlinkOptions.READ_START_COMMIT,
> FlinkOptions.START_COMMIT_EARLIEST);{code}
> Since I am reading from the earliest commit, the query starts off with a
> [full table
> scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
> which gets all the partitions in the table. Then, it
> [maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
> each readPartition in the readPartitions set into a list of
> {{MergeOnReadInputSplits}} like [[partition1split1, partition1split2],
> [partition2split1, partition2split2], …] and then flatmapping so that all
> splits under a single partition are adjacent in the resulting list. Then,
> these splits are distributed to the read subtasks. This is why I see splits
> with e.g. num 1-55 all corresponding to one partition, then 56-115
> corresponding to another partition, then 116-175 to another, etc.
> {code:java}
> FileIndex fileIndex = getFileIndex();
> readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());{code}
> The problem is that since the readPartitions is initialized as a HashSet, the
> order of partitions in this list is random, so subtasks could read partition
> 11, advance the watermarks, then read partition 8 after and consider
> everything late.
>
> So, within a subtask, there is no partition level order. i.e. even with
> parallelism = 1, partitions are read out of order.
>
> The goal is to use a sorted set (e.g. TreeSet) to have this partition level
> ordering on the initial full table scan.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)