This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f88093e0bf5c58c0014a7e4ca9859dd3db8dd20e
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Apr 24 10:36:06 2024 +0800

    [fix](routine-load) fix routine load lag is negative (#33846)
---
 .../doris/load/routineload/KafkaRoutineLoadJob.java       | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8540bb43963..c00f16b7d8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return false;
     }
 
+    private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment 
attachment) {
+        ((KafkaProgress) 
attachment.getProgress()).getOffsetByPartition().entrySet().stream()
+                .forEach(entity -> {
+                    if 
(cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
+                            && 
cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
+                        cachedPartitionWithLatestOffsets.put(entity.getKey(), 
entity.getValue() + 1);
+                    }
+                });
+        this.progress.update(attachment);
+    }
+
     @Override
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         super.updateProgress(attachment);
-        this.progress.update(attachment);
+        updateProgressAndOffsetsCache(attachment);
     }
 
     @Override
     protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
         super.replayUpdateProgress(attachment);
-        this.progress.update(attachment);
+        updateProgressAndOffsetsCache(attachment);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to