This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f88093e0bf5c58c0014a7e4ca9859dd3db8dd20e Author: HHoflittlefish777 <[email protected]> AuthorDate: Wed Apr 24 10:36:06 2024 +0800 [fix](routine-load) fix routine load lag is negative (#33846) --- .../doris/load/routineload/KafkaRoutineLoadJob.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 8540bb43963..c00f16b7d8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return false; } + private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) { + ((KafkaProgress) attachment.getProgress()).getOffsetByPartition().entrySet().stream() + .forEach(entity -> { + if (cachedPartitionWithLatestOffsets.containsKey(entity.getKey()) + && cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) { + cachedPartitionWithLatestOffsets.put(entity.getKey(), entity.getValue() + 1); + } + }); + this.progress.update(attachment); + } + @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment); + updateProgressAndOffsetsCache(attachment); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment); + updateProgressAndOffsetsCache(attachment); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
