[ 
https://issues.apache.org/jira/browse/HUDI-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Guo updated HUDI-5967:
---------------------------
    Description: 
I am running a streaming read query on an hourly partitioned COW table with the 
following settings and using 
[StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]


{{config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);}}
{{config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);}}
{{config.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);}}

Since I am reading from the earliest commit, the query starts off with a [full 
table 
scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
 which gets all the partitions in the table. Then, it 
[maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
 each readPartition in the readPartitions set into a list of 
{{MergeOnReadInputSplits}} like [[partition1split1, partition1split2], 
[partition2split1, partition2split2], …] and then flatmapping so that all 
splits under a single partition are adjacent in the resulting list. Then, these 
splits are distributed to the read subtasks. This is why I see splits with e.g. 
num 1-55 all corresponding to one partition, then 56-115 corresponding to 
another partition, then 116-175 to another, etc.


{{FileIndex fileIndex = getFileIndex();}}
{{readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());}}


The problem is that since the readPartitions is initialized as a HashSet, the 
order of partitions in this list is random, so subtasks could read partition 
11, advance the watermarks, then read partition 8 after and consider everything 
late.
 
So, within a subtask, there is no partition level order. i.e. even with 
parallelism = 1, partitions are read out of order.
 
The goal is to use a sorted set (e.g. TreeSet) to have this partition level 
ordering on the initial full table scan.

  was:
I am running a streaming read query on an hourly partitioned COW table with the 
following settings and using 
[StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
config.set(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);

Since I am reading from the earliest commit, the query starts off with a [full 
table 
scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
 which gets all the partitions in the table. Then, it 
[maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
 each readPartition in the readPartitions set into a list of 
{{MergeOnReadInputSplits}} like [[partition1split1, partition1split2], 
[partition2split1, partition2split2], …] and then flatmapping so that all 
splits under a single partition are adjacent in the resulting list. Then, these 
splits are distributed to the read subtasks. This is why I see splits with e.g. 
num 1-55 all corresponding to one partition, then 56-115 corresponding to 
another partition, then 116-175 to another, etc.
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
The problem is that since the readPartitions is initialized as a HashSet, the 
order of partitions in this list is random, so subtasks could read partition 
11, advance the watermarks, then read partition 8 after and consider everything 
late.
 
So, within a subtask, there is no partition level order. i.e. even with 
parallelism = 1, partitions are read out of order.
 
The goal is to use a sorted set (e.g. TreeSet) to have this partition level 
ordering on the initial full table scan.


> Add partition ordering for full table scans
> -------------------------------------------
>
>                 Key: HUDI-5967
>                 URL: https://issues.apache.org/jira/browse/HUDI-5967
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink
>            Reporter: Alex Guo
>            Priority: Minor
>             Fix For: 0.13.1
>
>
> I am running a streaming read query on an hourly partitioned COW table with 
> the following settings and using 
> [StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
> {{config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);}}
> {{config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);}}
> {{config.set(FlinkOptions.READ_START_COMMIT, 
> FlinkOptions.START_COMMIT_EARLIEST);}}
> Since I am reading from the earliest commit, the query starts off with a 
> [full table 
> scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
>  which gets all the partitions in the table. Then, it 
> [maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
>  each readPartition in the readPartitions set into a list of 
> {{MergeOnReadInputSplits}} like [[partition1split1, partition1split2], 
> [partition2split1, partition2split2], …] and then flatmapping so that all 
> splits under a single partition are adjacent in the resulting list. Then, 
> these splits are distributed to the read subtasks. This is why I see splits 
> with e.g. num 1-55 all corresponding to one partition, then 56-115 
> corresponding to another partition, then 116-175 to another, etc.
> {{FileIndex fileIndex = getFileIndex();}}
> {{readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());}}
> The problem is that since the readPartitions is initialized as a HashSet, the 
> order of partitions in this list is random, so subtasks could read partition 
> 11, advance the watermarks, then read partition 8 after and consider 
> everything late.
>  
> So, within a subtask, there is no partition level order. i.e. even with 
> parallelism = 1, partitions are read out of order.
>  
> The goal is to use a sorted set (e.g. TreeSet) to have this partition level 
> ordering on the initial full table scan.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to