This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 213c9f89e5f [HUDI-7153] Fixing range overflow with kakfa source and 
spark partition management (#10205)
213c9f89e5f is described below

commit 213c9f89e5f1ff872b65b9325353c9e233dc6c22
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 1 15:08:10 2023 -0800

    [HUDI-7153] Fixing range overflow with kakfa source and spark partition 
management (#10205)
---
 .../org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 328436dbcd2..d5faec3595e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -156,7 +156,13 @@ public class KafkaOffsetGen {
             continue;
           }
 
-          long toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
eventsPerPartition);
+          long toOffset = -1L;
+          if (range.fromOffset() + eventsPerPartition > range.fromOffset()) {
+            toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
eventsPerPartition);
+          } else {
+            // handling Long overflow
+            toOffset = range.untilOffset();
+          }
           if (toOffset == range.untilOffset()) {
             exhaustedPartitions.add(range.partition());
           }

Reply via email to