waitingF commented on code in PR #8376:
URL: https://github.com/apache/hudi/pull/8376#discussion_r1177276655
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java:
##########
@@ -57,63 +58,191 @@ public void testStringToOffsets() {
@Test
public void testOffsetToString() {
OffsetRange[] ranges =
- CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1},
new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1}, new long[]{300000,
350000}), 1000000L);
+ CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1},
new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}),
1000000L, 0);
assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000",
CheckpointUtils.offsetsToStr(ranges));
+
+ ranges = new OffsetRange[] {
+ OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+ OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200),
+ OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200),
+ OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)};
+ assertEquals(TEST_TOPIC_NAME + ",0:200,1:300",
CheckpointUtils.offsetsToStr(ranges));
}
@Test
- public void testComputeOffsetRanges() {
+ public void testComputeOffsetRangesWithoutMinPartitions() {
// test totalNewMessages()
- long totalMsgs = CheckpointUtils.totalNewMessages(new
OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
- OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
+ long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]
{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+ OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
assertEquals(200, totalMsgs);
// should consume all the full data
OffsetRange[] ranges =
- CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1},
new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1}, new long[]{300000,
350000}), 1000000L);
+ CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1},
new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}),
1000000L, 0);
assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
// should only consume upto limit
- ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0,
1}, new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000);
+ ranges = CheckpointUtils.computeOffsetRanges(
+ makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000,
0);
assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(200000, ranges[0].fromOffset());
assertEquals(205000, ranges[0].untilOffset());
assertEquals(250000, ranges[1].fromOffset());
assertEquals(255000, ranges[1].untilOffset());
// should also consume from new partitions.
- ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0,
1}, new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000,
100000}), 1000000L);
+ ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0,
1}, new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000,
100000}), 1000000L, 0);
assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(3, ranges.length);
// for skewed offsets, does not starve any partition & can catch up
- ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0,
1}, new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000,
10000}), 100000);
+ ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0,
1}, new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000,
10000}), 100000, 0);
assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(89990, ranges[1].count());
assertEquals(10000, ranges[2].count());
- ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0,
1}, new long[]{200000, 250000}),
- makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000,
10000}), 1000000);
+ ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0,
1}, new long[] {200000, 250000}),
+ makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000,
10000}), 1000000, 0);
assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());
assertEquals(10000, ranges[2].count());
// not all partitions consume same entries.
- ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1,
2, 3, 4}, new long[]{0, 0, 0, 0, 0}),
- makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000,
1000, 1000, 1000}), 1001);
+ ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0,
1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+ makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000,
1000, 1000}), 1001, 0);
assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
assertEquals(100, ranges[0].count());
assertEquals(226, ranges[1].count());
- assertEquals(226, ranges[2].count());
- assertEquals(226, ranges[3].count());
- assertEquals(223, ranges[4].count());
+ assertEquals(225, ranges[2].count());
Review Comment:
@bvaradar sure.
In the new range compute algorithm, it will calculate the
`eventsPerPartition` in each events allocation for every partition based on the
`remainingEvents` and `remainingPartitions`, see the code below
```java
long remainingEvents = actualNumEvents - allocedEvents;
long remainingPartitions = toOffsetMap.size() -
allocatedPartitionsThisLoop.size();
// if need tp split into minPartitions, recalculate the
remainingPartitions
if (needSplitToMinPartitions) {
remainingPartitions = minPartitions - finalRanges.size();
}
long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents)
/ remainingPartitions);
```
For this testcase, with `numEvents`=1001 and 5 partitions offsets:
partition0 from 0 to 100,
partition1 from 0 to 1000,
partition2 from 0 to 1000,
partition3 from 0 to 1000,
partition4 from 0 to 1000
After it allocate paritition0 with 100 events, resulting:
remainingEvents=901, remainingPartitions=4
Then it allocate partitioin1 with `(long) Math.ceil(1.0 * 901 / 4)` = 226
events, resulting: remainingEvents=675, remainingPartitions=3
Then it allocate partition2 with 675/3 = 225 events, resulting:
remainingEvents=450, remainingPartitions=2
Then it allocate parition3 with 450/2 = 225 events, resulting:
remainingEvents=225, remainingPartitions=1
Finally it allocate partition4 with 225 events.
While in the old algorithm, it only calculate the `eventsPerPartition` once
in every `while` loop. Let me simulate the old algorithm for this testcase.
In the first loop, it calculate `eventsPerPartition` with `(long)
Math.ceil(1.0*1001 / 5)` = 201, so in the first loop it will allocate
partition0 with 100 events, partition0 only has 100 events, partition0 now
exhausted.
partition1 with 201 events
partition2 with 201 events
partition3 with 201 events
partition4 with 201 events
resulting `remainingEvents` = 1001 - 100 - 201*4 = 97.
In the second loop, it calculate `eventsPerPartition` = `(long)
Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() -
exhaustedPartitions.size()))` = `(long) Math.ceil(1.0*97 / 4)` = 25. So in this
loop, it will allocate
partition0 with 0 events, resulting partition0 total 100 events,
partition1 with 25 events, resulting partition1 total 226 events,
partition2 with 25 events, resulting partition2 total 226 events,
partition3 with 25 events, resulting parition3 total 226 events
partition4 with only 22 events, because it will exceed nunEvents=1001 if
allocate 25 events, resulint partition4 total 223 events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]