waitingF commented on code in PR #8376:
URL: https://github.com/apache/hudi/pull/8376#discussion_r1177276655


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java:
##########
@@ -57,63 +58,191 @@ public void testStringToOffsets() {
   @Test
   public void testOffsetToString() {
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000", 
CheckpointUtils.offsetsToStr(ranges));
+
+    ranges = new OffsetRange[] {
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)};
+    assertEquals(TEST_TOPIC_NAME + ",0:200,1:300", 
CheckpointUtils.offsetsToStr(ranges));
   }
 
   @Test
-  public void testComputeOffsetRanges() {
+  public void testComputeOffsetRangesWithoutMinPartitions() {
     // test totalNewMessages()
-    long totalMsgs = CheckpointUtils.totalNewMessages(new 
OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
-            OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
+    long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] 
{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
     assertEquals(200, totalMsgs);
 
     // should consume all the full data
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
 
     // should only consume upto limit
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000);
+    ranges = CheckpointUtils.computeOffsetRanges(
+        makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000, 
0);
     assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(200000, ranges[0].fromOffset());
     assertEquals(205000, ranges[0].untilOffset());
     assertEquals(250000, ranges[1].fromOffset());
     assertEquals(255000, ranges[1].untilOffset());
 
     // should also consume from new partitions.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 
100000}), 1000000L);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 
100000}), 1000000L, 0);
     assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(3, ranges.length);
 
     // for skewed offsets, does not starve any partition & can catch up
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 100000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 100000, 0);
     assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(89990, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 1000000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 1000000, 0);
     assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(100000, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
     // not all partitions consume same entries.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 
2, 3, 4}, new long[]{0, 0, 0, 0, 0}),
-            makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 
1000, 1000, 1000}), 1001);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+        makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 
1000, 1000}), 1001, 0);
     assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(100, ranges[0].count());
     assertEquals(226, ranges[1].count());
-    assertEquals(226, ranges[2].count());
-    assertEquals(226, ranges[3].count());
-    assertEquals(223, ranges[4].count());
+    assertEquals(225, ranges[2].count());

Review Comment:
   @bvaradar sure.
   In the new range compute algorithm, it will calculate the 
`eventsPerPartition` in each events allocation for every partition based on the 
`remainingEvents` and `remainingPartitions`, see the code below
   ```java
             long remainingEvents = actualNumEvents - allocedEvents;
             long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
             // if need tp split into minPartitions, recalculate the 
remainingPartitions
             if (needSplitToMinPartitions) {
               remainingPartitions = minPartitions - finalRanges.size();
             }
             long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) 
/ remainingPartitions);
   ```
   For this testcase, with `numEvents`=1001 and 5 partitions offsets:
   partition0 from 0 to 100,
   partition1 from 0 to 1000,
   partition2 from 0 to 1000,
   partition3 from 0 to 1000,
   partition4 from 0 to 1000
   
   After it allocate paritition0 with 100 events, resulting: 
remainingEvents=901, remainingPartitions=4
   Then it allocate partitioin1 with `(long) Math.ceil(1.0 * 901 / 4)` = 226 
events, resulting: remainingEvents=675, remainingPartitions=3
   Then it allocate partition2 with 675/3 = 225 events, resulting: 
remainingEvents=450, remainingPartitions=2
   Then it allocate parition3 with 450/2 = 225 events, resulting: 
remainingEvents=225, remainingPartitions=1
   Finally it allocate partition4 with 225 events.
   
   While in the old algorithm, it only calculate the `eventsPerPartition` once 
in every `while` loop. Let me simulate the old algorithm for this testcase.
   In the first loop, it calculate `eventsPerPartition` with `(long) 
Math.ceil(1.0*1001 / 5)` = 201, so in the first loop it will allocate 
   partition0 with 100 events, partition0 only has 100 events, partition0 now 
exhausted.
   partition1 with 201 events
   partition2 with 201 events
   partition3 with 201 events
   partition4 with 201 events
   resulting `remainingEvents` = 1001 - 100 - 201*4 = 97.
   In the second loop, it calculate `eventsPerPartition` = `(long) 
Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - 
exhaustedPartitions.size()))` = `(long) Math.ceil(1.0*97 / 4)` = 25. So in this 
loop, it will allocate
   partition0 with 0 events, resulting partition0 total 100 events, 
   partition1 with 25 events, resulting partition1 total 226 events,
   partition2 with 25 events, resulting partition2 total 226 events,
   partition3 with 25 events, resulting parition3 total 226 events
   partition4 with only 22 events, because it will exceed nunEvents=1001 if 
allocate 25 events, resulint partition4 total 223 events.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to