This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new af960f7c70f [branch-2.1](routine-load) dealing with the high watermark
of Kafka may fallback (#37372)
af960f7c70f is described below
commit af960f7c70f8569d856b30be36515417bf8dbc67
Author: hui lai <[email protected]>
AuthorDate: Sun Jul 7 18:15:54 2024 +0800
[branch-2.1](routine-load) dealing with the high watermark of Kafka may
fallback (#37372)
pick #35901
---
.../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 43ae98d8f7a..201412027ab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -765,7 +765,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, getBrokerList(),
getTopic(), getConvertedCustomProperties(),
Lists.newArrayList(partitionIdToOffset.keySet()));
for (Pair<Integer, Long> pair : tmp) {
- cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
+ if (pair.second >=
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
+ cachedPartitionWithLatestOffsets.put(pair.first,
pair.second);
+ } else {
+ LOG.warn("Kafka offset fallback. partition: {}, cache
offset: {}"
+ + " get latest offset: {}, task {}, job {}",
+ pair.first,
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
+ pair.second, taskId, id);
+ }
}
} catch (Exception e) {
// It needs to pause job when can not get partition meta.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]