This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 186ed0ed80d [fix](routine-load) dealing with the high watermark of 
Kafka may fallback (#35901) (#37454)
186ed0ed80d is described below

commit 186ed0ed80da2b5b05ad17589635b1a1a3aa2502
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:33:58 2024 +0800

    [fix](routine-load) dealing with the high watermark of Kafka may fallback 
(#35901) (#37454)
    
    pick (#35901)
---
 .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 5df7714ffd3..692d79d445e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -743,7 +743,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, getBrokerList(),
                     getTopic(), getConvertedCustomProperties(), 
Lists.newArrayList(partitionIdToOffset.keySet()));
             for (Pair<Integer, Long> pair : tmp) {
-                cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
+                if (pair.second >= 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
+                    cachedPartitionWithLatestOffsets.put(pair.first, 
pair.second);
+                } else {
+                    LOG.warn("Kafka offset fallback. partition: {}, cache 
offset: {}"
+                                + " get latest offset: {}, task {}, job {}",
+                                pair.first, 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
+                                pair.second, taskId, id);
+                }
             }
         } catch (Exception e) {
             // It needs to pause job when can not get partition meta.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to