This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 213c9f89e5f [HUDI-7153] Fixing range overflow with kakfa source and
spark partition management (#10205)
213c9f89e5f is described below
commit 213c9f89e5f1ff872b65b9325353c9e233dc6c22
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Dec 1 15:08:10 2023 -0800
[HUDI-7153] Fixing range overflow with kakfa source and spark partition
management (#10205)
---
.../org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 328436dbcd2..d5faec3595e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -156,7 +156,13 @@ public class KafkaOffsetGen {
continue;
}
- long toOffset = Math.min(range.untilOffset(), range.fromOffset() +
eventsPerPartition);
+ long toOffset = -1L;
+ if (range.fromOffset() + eventsPerPartition > range.fromOffset()) {
+ toOffset = Math.min(range.untilOffset(), range.fromOffset() +
eventsPerPartition);
+ } else {
+ // handling Long overflow
+ toOffset = range.untilOffset();
+ }
if (toOffset == range.untilOffset()) {
exhaustedPartitions.add(range.partition());
}