voon created HUDI-4841:
--------------------------
Summary: Flink read issue; BlockLocations not sorted properly;
Sort implementation is not idempotent
Key: HUDI-4841
URL: https://issues.apache.org/jira/browse/HUDI-4841
Project: Apache Hudi
Issue Type: Bug
Reporter: voon
Assignee: voon
h1. Description of Bug
CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be
sorted by offsets in ascending order.
However, the current comparator implementation does not guarantee that the
BlockLocation array is sorted in an ascending order.
h1. Stacktrace
{code:java}
Caused by: java.lang.IllegalArgumentException: The given offset is not
contained in the any block. at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374)
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242)
at
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234)
... 21 more {code}
h1. Reproduction of issue
h2. Current sorting implementation
{code:java}
Arrays.sort(blocks, new Comparator<BlockLocation>() {
@Override
public int compare(BlockLocation o1, BlockLocation o2) {
long diff = o1.getLength() - o2.getOffset();
return Long.compare(diff, 0L);
}
}); {code}
h2. Test
{code:java}
public class TestBlockLocationSort {
static int compare(org.apache.hadoop.fs.BlockLocation o1,
org.apache.hadoop.fs.BlockLocation o2) {
long diff = o1.getLength() - o2.getOffset();
return Long.compare(diff, 0L);
}
@Test
void testBlockLocationSort() {
BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5);
BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5);
BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4);
BlockLocation[] blocks1 = {o1, o2, o3};
System.out.println("BlockLocation[] bef. sort [pass 1]: " +
Arrays.toString(blocks1));
Arrays.sort(blocks1, TestBlockLocationSort::compare);
System.out.println("BlockLocation[] aft. sort [pass 1]: " +
Arrays.toString(blocks1) + "\n");
System.out.println("BlockLocation[] bef. sort [pass 2]: " +
Arrays.toString(blocks1));
Arrays.sort(blocks1, TestBlockLocationSort::compare);
System.out.println("BlockLocation[] aft. sort [pass 2]: " +
Arrays.toString(blocks1) + "\n");
}
}{code}
h2. Output
{code:java}
BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4]
BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5]
BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5]
BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]{code}
As can be seen, the current BlockLocation sorting is not idempotent.
Sorting should be idempotent - Sorting a collection the first time will put it
in order, running a sort operation on the same array again should have no
impact on the array that is already sorted.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)