[ 
https://issues.apache.org/jira/browse/HUDI-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated HUDI-4841:
-----------------------------
    Fix Version/s: 0.12.1

> Flink read issue; BlockLocations not sorted properly; Sort implementation is 
> not idempotent
> -------------------------------------------------------------------------------------------
>
>                 Key: HUDI-4841
>                 URL: https://issues.apache.org/jira/browse/HUDI-4841
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: voon
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.12.1
>
>
> h1. Description of Bug
> CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to 
> be sorted by offsets in ascending order. 
>  
> However, the current comparator implementation does not guarantee that the 
> BlockLocation array is sorted in an ascending order.
>  
> h1. Stacktrace
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: The given offset is not 
> contained in the any block.    at 
> org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374)
>     at 
> org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242)
>     at 
> org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66)
>     at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234)
>     ... 21 more {code}
>  
>  
> h1. Reproduction of issue
> h2. Current sorting implementation
>  
> {code:java}
> Arrays.sort(blocks, new Comparator<BlockLocation>() {
>   @Override
>   public int compare(BlockLocation o1, BlockLocation o2) {
>     long diff = o1.getLength() - o2.getOffset();
>     return Long.compare(diff, 0L);
>   }
> }); {code}
>  
>  
> h2. Test
>  
> {code:java}
> public class TestBlockLocationSort {
>   static int compare(org.apache.hadoop.fs.BlockLocation o1, 
> org.apache.hadoop.fs.BlockLocation o2) {
>     long diff = o1.getLength() - o2.getOffset();
>     return Long.compare(diff, 0L);
>   }
>   @Test
>   void testBlockLocationSort() {
>     BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
>     BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
>     BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);
>     BlockLocation[] blocks1 = {o1, o2, o3};
>     System.out.println("BlockLocation[] bef. sort [pass 1]: " + 
> Arrays.toString(blocks1));
>     Arrays.sort(blocks1, TestBlockLocationSort::compare);
>     System.out.println("BlockLocation[] aft. sort [pass 1]: " + 
> Arrays.toString(blocks1) + "\n");
>     System.out.println("BlockLocation[] bef. sort [pass 2]: " + 
> Arrays.toString(blocks1));
>     Arrays.sort(blocks1, TestBlockLocationSort::compare);
>     System.out.println("BlockLocation[] aft. sort [pass 2]: " + 
> Arrays.toString(blocks1) + "\n");
>   }
> }{code}
>  
>  
> h2. Output
>  
> {code:java}
> BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
> BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]
> BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
> BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
>  
>  
> As can be seen, the current BlockLocation sorting is not idempotent. 
> Sorting should be idempotent - Sorting a collection the first time will put 
> it in order, running a sort operation on the same array again should have no 
> impact on the array that is already sorted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to