This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new af960f7c70f [branch-2.1](routine-load) dealing with the high watermark 
of Kafka may fallback (#37372)
af960f7c70f is described below

commit af960f7c70f8569d856b30be36515417bf8dbc67
Author: hui lai <[email protected]>
AuthorDate: Sun Jul 7 18:15:54 2024 +0800

    [branch-2.1](routine-load) dealing with the high watermark of Kafka may 
fallback (#37372)
    
    pick #35901
---
 .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 43ae98d8f7a..201412027ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -765,7 +765,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, getBrokerList(),
                     getTopic(), getConvertedCustomProperties(), 
Lists.newArrayList(partitionIdToOffset.keySet()));
             for (Pair<Integer, Long> pair : tmp) {
-                cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
+                if (pair.second >= 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
+                    cachedPartitionWithLatestOffsets.put(pair.first, 
pair.second);
+                } else {
+                    LOG.warn("Kafka offset fallback. partition: {}, cache 
offset: {}"
+                                + " get latest offset: {}, task {}, job {}",
+                                pair.first, 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
+                                pair.second, taskId, id);
+                }
             }
         } catch (Exception e) {
             // It needs to pause job when can not get partition meta.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to