[
https://issues.apache.org/jira/browse/HUDI-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Guo updated HUDI-5967:
---------------------------
Description:
I am running a streaming read query on an hourly partitioned COW table with the
following settings and using
[StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
{code:java}
config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
config.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);{code}
Since I am reading from the earliest commit, the query starts off with a [full
table
scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
which gets all the partitions in the table. Then, it
[maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
each readPartition in the readPartitions set into a list of
{{MergeOnReadInputSplits}} like [[partition1split1, partition1split2],
[partition2split1, partition2split2], …] and then flatmapping so that all
splits under a single partition are adjacent in the resulting list. Then, these
splits are distributed to the read subtasks. This is why I see splits with e.g.
num 1-55 all corresponding to one partition, then 56-115 corresponding to
another partition, then 116-175 to another, etc.
{{FileIndex fileIndex = getFileIndex();}}
{{readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());}}
The problem is that since the readPartitions is initialized as a HashSet, the
order of partitions in this list is random, so subtasks could read partition
11, advance the watermarks, then read partition 8 after and consider everything
late.
So, within a subtask, there is no partition level order. i.e. even with
parallelism = 1, partitions are read out of order.
The goal is to use a sorted set (e.g. TreeSet) to have this partition level
ordering on the initial full table scan.
was:
I am running a streaming read query on an hourly partitioned COW table with the
following settings and using
[StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
{{config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);}}
{{config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);}}
{{config.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);}}
Since I am reading from the earliest commit, the query starts off with a [full
table
scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
which gets all the partitions in the table. Then, it
[maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
each readPartition in the readPartitions set into a list of
{{MergeOnReadInputSplits}} like [[partition1split1, partition1split2],
[partition2split1, partition2split2], …] and then flatmapping so that all
splits under a single partition are adjacent in the resulting list. Then, these
splits are distributed to the read subtasks. This is why I see splits with e.g.
num 1-55 all corresponding to one partition, then 56-115 corresponding to
another partition, then 116-175 to another, etc.
{{FileIndex fileIndex = getFileIndex();}}
{{readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());}}
The problem is that since the readPartitions is initialized as a HashSet, the
order of partitions in this list is random, so subtasks could read partition
11, advance the watermarks, then read partition 8 after and consider everything
late.
So, within a subtask, there is no partition level order. i.e. even with
parallelism = 1, partitions are read out of order.
The goal is to use a sorted set (e.g. TreeSet) to have this partition level
ordering on the initial full table scan.
> Add partition ordering for full table scans
> -------------------------------------------
>
> Key: HUDI-5967
> URL: https://issues.apache.org/jira/browse/HUDI-5967
> Project: Apache Hudi
> Issue Type: Improvement
> Components: flink
> Reporter: Alex Guo
> Priority: Minor
> Fix For: 0.13.1
>
>
> I am running a streaming read query on an hourly partitioned COW table with
> the following settings and using
> [StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
>
> {code:java}
> config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
> config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
> config.set(FlinkOptions.READ_START_COMMIT,
> FlinkOptions.START_COMMIT_EARLIEST);{code}
>
>
> Since I am reading from the earliest commit, the query starts off with a
> [full table
> scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
> which gets all the partitions in the table. Then, it
> [maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
> each readPartition in the readPartitions set into a list of
> {{MergeOnReadInputSplits}} like [[partition1split1, partition1split2],
> [partition2split1, partition2split2], …] and then flatmapping so that all
> splits under a single partition are adjacent in the resulting list. Then,
> these splits are distributed to the read subtasks. This is why I see splits
> with e.g. num 1-55 all corresponding to one partition, then 56-115
> corresponding to another partition, then 116-175 to another, etc.
> {{FileIndex fileIndex = getFileIndex();}}
> {{readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());}}
> The problem is that since the readPartitions is initialized as a HashSet, the
> order of partitions in this list is random, so subtasks could read partition
> 11, advance the watermarks, then read partition 8 after and consider
> everything late.
>
> So, within a subtask, there is no partition level order. i.e. even with
> parallelism = 1, partitions are read out of order.
>
> The goal is to use a sorted set (e.g. TreeSet) to have this partition level
> ordering on the initial full table scan.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)