Alex Guo created HUDI-5967:
------------------------------

             Summary: Add partition ordering for full table scans
                 Key: HUDI-5967
                 URL: https://issues.apache.org/jira/browse/HUDI-5967
             Project: Apache Hudi
          Issue Type: Improvement
          Components: flink
            Reporter: Alex Guo
             Fix For: 0.13.1


I am running a streaming read query on an hourly partitioned COW table with the 
following settings and using 
[StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
config.set(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);

Since I am reading from the earliest commit, the query starts off with a [full 
table 
scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284],
 which gets all the partitions in the table. Then, it 
[maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378]
 each readPartition in the readPartitions set into a list of 
{{MergeOnReadInputSplits}} like [[partition1split1, partition1split2], 
[partition2split1, partition2split2], …] and then flatmapping so that all 
splits under a single partition are adjacent in the resulting list. Then, these 
splits are distributed to the read subtasks. This is why I see splits with e.g. 
num 1-55 all corresponding to one partition, then 56-115 corresponding to 
another partition, then 116-175 to another, etc.
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
The problem is that since the readPartitions is initialized as a HashSet, the 
order of partitions in this list is random, so subtasks could read partition 
11, advance the watermarks, then read partition 8 after and consider everything 
late.
 
So, within a subtask, there is no partition level order. i.e. even with 
parallelism = 1, partitions are read out of order.
 
The goal is to use a sorted set (e.g. TreeSet) to have this partition level 
ordering on the initial full table scan.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to