voon created HUDI-4841:
--------------------------

             Summary: Flink read issue; BlockLocations not sorted properly; 
Sort implementation is not idempotent
                 Key: HUDI-4841
                 URL: https://issues.apache.org/jira/browse/HUDI-4841
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: voon
            Assignee: voon


h1. Description of Bug

CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be 
sorted by offsets in ascending order. 

 

However, the current comparator implementation does not guarantee that the 
BlockLocation array is sorted in an ascending order.

 
h1. Stacktrace

 
{code:java}
Caused by: java.lang.IllegalArgumentException: The given offset is not 
contained in the any block.    at 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374)
    at 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242)
    at 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66)
    at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234)
    ... 21 more {code}
 

 
h1. Reproduction of issue
h2. Current sorting implementation

 
{code:java}
Arrays.sort(blocks, new Comparator<BlockLocation>() {
  @Override
  public int compare(BlockLocation o1, BlockLocation o2) {
    long diff = o1.getLength() - o2.getOffset();
    return Long.compare(diff, 0L);
  }
}); {code}
 

 
h2. Test

 
{code:java}
public class TestBlockLocationSort {

  static int compare(org.apache.hadoop.fs.BlockLocation o1, 
org.apache.hadoop.fs.BlockLocation o2) {
    long diff = o1.getLength() - o2.getOffset();
    return Long.compare(diff, 0L);
  }

  @Test
  void testBlockLocationSort() {
    BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
    BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
    BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);

    BlockLocation[] blocks1 = {o1, o2, o3};
    System.out.println("BlockLocation[] bef. sort [pass 1]: " + 
Arrays.toString(blocks1));
    Arrays.sort(blocks1, TestBlockLocationSort::compare);
    System.out.println("BlockLocation[] aft. sort [pass 1]: " + 
Arrays.toString(blocks1) + "\n");

    System.out.println("BlockLocation[] bef. sort [pass 2]: " + 
Arrays.toString(blocks1));
    Arrays.sort(blocks1, TestBlockLocationSort::compare);
    System.out.println("BlockLocation[] aft. sort [pass 2]: " + 
Arrays.toString(blocks1) + "\n");
  }

}{code}
 

 
h2. Output

 
{code:java}
BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]

BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
 

 

As can be seen, the current BlockLocation sorting is not idempotent. 

Sorting should be idempotent - Sorting a collection the first time will put it 
in order, running a sort operation on the same array again should have no 
impact on the array that is already sorted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to