This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e30738bad9 [enhance](job) refresh routine load lag more timely 
(#63654)
5e30738bad9 is described below

commit 5e30738bad9fea5962bb4b0a71729907be9c8348
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 3 23:39:31 2026 +0800

    [enhance](job) refresh routine load lag more timely (#63654)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    Routine load lag was refreshed mainly when task scheduling needed to
    recheck latest offsets after consuming the cached end offset. If
    producers continued appending data while the job was running, the cached
    latest offsets could become stale, so the reported routine load lag was
    not real-time enough.
    
    This PR refreshes routine load lag cache during `RoutineLoadScheduler`
    rounds. The metric path still only reads in-memory state and does not
    call Kafka directly.
    
    For routine load jobs, the latest offset cache is refreshed for current
    progress partitions. Concurrent updates from job scheduling and task
    scheduling are handled with monotonic atomic max updates, so latest
    offsets do not regress. Kafka metadata requests also use snapshots of
    broker/topic/converted properties.
---
 .../doris/load/routineload/RoutineLoadJob.java     |  3 +
 .../doris/load/routineload/RoutineLoadManager.java | 14 ++++
 .../load/routineload/RoutineLoadScheduler.java     |  2 +
 .../routineload/kafka/KafkaRoutineLoadJob.java     | 87 ++++++++++++++++------
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 60 +++++++++++++++
 5 files changed, 143 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 82cd9325dfa..3223ff91359 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1001,6 +1001,9 @@ public abstract class RoutineLoadJob
         return 0L;
     }
 
+    public void updateLag() throws UserException {
+    }
+
     protected abstract RoutineLoadTaskInfo unprotectRenewTask(
             RoutineLoadTaskInfo routineLoadTaskInfo, boolean delaySchedule);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index f24b9ad2252..904fa207ac3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -875,6 +875,20 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    public void updateRoutineLoadJobLag() {
+        for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
+            if (!routineLoadJob.state.isFinalState()) {
+                try {
+                    routineLoadJob.updateLag();
+                } catch (UserException e) {
+                    LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
+                            .add("msg", "failed to update routine load lag")
+                            .build(), e);
+                }
+            }
+        }
+    }
+
     public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
         unprotectedAddJob(routineLoadJob);
         LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 023cd239e09..6fe09e27a0b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -62,6 +62,8 @@ public class RoutineLoadScheduler extends MasterDaemon {
     private void process() throws UserException {
         // update
         routineLoadManager.updateRoutineLoadJob();
+        // refresh lag cache after job progress and partition metadata are 
updated
+        routineLoadManager.updateRoutineLoadJobLag();
         // get need schedule routine jobs
         List<RoutineLoadJob> routineLoadJobList = null;
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
index e1290b03996..037b468b53a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
@@ -194,9 +194,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     @Override
     public void prepare() throws UserException {
-        // should reset converted properties each time the job being prepared.
-        // because the file info can be changed anytime.
-        convertCustomProperties(true);
+        writeLock();
+        try {
+            // should reset converted properties each time the job being 
prepared.
+            // because the file info can be changed anytime.
+            convertCustomProperties(true);
+        } finally {
+            writeUnlock();
+        }
     }
 
     private void convertCustomProperties(boolean rebuild) throws DdlException {
@@ -345,15 +350,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment 
attachment) {
         ((KafkaProgress) 
attachment.getProgress()).getOffsetByPartition().entrySet().stream()
-                .forEach(entity -> {
-                    if 
(cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
-                            && 
cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
-                        cachedPartitionWithLatestOffsets.put(entity.getKey(), 
entity.getValue() + 1);
-                    }
-                });
+                .forEach(entity -> 
cachedPartitionWithLatestOffsets.computeIfPresent(entity.getKey(),
+                        (partitionId, cachedOffset) -> Math.max(cachedOffset, 
entity.getValue() + 1)));
         this.progress.update(attachment);
     }
 
+    private void updateLatestOffsetsCache(List<Pair<Integer, Long>> 
latestOffsets, UUID taskId) {
+        for (Pair<Integer, Long> pair : latestOffsets) {
+            Long updatedOffset = 
cachedPartitionWithLatestOffsets.merge(pair.first, pair.second, Math::max);
+            if (updatedOffset > pair.second) {
+                LOG.warn("Kafka offset fallback. partition: {}, cache offset: 
{}"
+                            + " get latest offset: {}, task {}, job {}",
+                            pair.first, updatedOffset, pair.second, taskId, 
id);
+            }
+        }
+    }
+
     @Override
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         updateProgressAndOffsetsCache(attachment);
@@ -655,14 +667,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         super.setOptional(info);
         KafkaDataSourceProperties kafkaDataSourceProperties
                 = (KafkaDataSourceProperties) info.getDataSourceProperties();
-        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
-            setCustomKafkaPartitions(kafkaDataSourceProperties);
-        }
         if 
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
             
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
         }
         // set group id if not specified
         this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + 
UUID.randomUUID());
+        convertCustomProperties(true);
+        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
+            setCustomKafkaPartitions(kafkaDataSourceProperties);
+        }
     }
 
     // this is an unprotected method which is called in the initialization 
function
@@ -902,18 +915,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         try {
             // all offsets to be consumed are newer than offsets in 
cachedPartitionWithLatestOffsets,
             // maybe the cached offset is out-of-date, fetch from kafka server 
again
-            List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, getBrokerList(),
-                    getTopic(), getConvertedCustomProperties(), 
Lists.newArrayList(partitionIdToOffset.keySet()));
-            for (Pair<Integer, Long> pair : tmp) {
-                if (pair.second >= 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
-                    cachedPartitionWithLatestOffsets.put(pair.first, 
pair.second);
-                } else {
-                    LOG.warn("Kafka offset fallback. partition: {}, cache 
offset: {}"
-                                + " get latest offset: {}, task {}, job {}",
-                                pair.first, 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
-                                pair.second, taskId, id);
-                }
+            String brokerListSnapshot;
+            String topicSnapshot;
+            Map<String, String> customPropertiesSnapshot;
+            writeLock();
+            try {
+                convertCustomProperties(false);
+                brokerListSnapshot = brokerList;
+                topicSnapshot = topic;
+                customPropertiesSnapshot = 
Maps.newHashMap(convertedCustomProperties);
+            } finally {
+                writeUnlock();
             }
+            List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, brokerListSnapshot,
+                    topicSnapshot, customPropertiesSnapshot, 
Lists.newArrayList(partitionIdToOffset.keySet()));
+            updateLatestOffsetsCache(tmp, taskId);
         } catch (Exception e) {
             // It needs to pause job when can not get partition meta.
             // To ensure the stability of the routine load,
@@ -955,6 +971,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return false;
     }
 
+    @Override
+    public void updateLag() throws UserException {
+        List<Integer> partitionIds;
+        String brokerListSnapshot;
+        String topicSnapshot;
+        Map<String, String> customPropertiesSnapshot;
+        writeLock();
+        try {
+            convertCustomProperties(false);
+            partitionIds = Lists.newArrayList(((KafkaProgress) 
progress).getOffsetByPartition().keySet());
+            if (partitionIds.isEmpty()) {
+                return;
+            }
+            brokerListSnapshot = brokerList;
+            topicSnapshot = topic;
+            customPropertiesSnapshot = 
Maps.newHashMap(convertedCustomProperties);
+        } finally {
+            writeUnlock();
+        }
+        UUID taskId = UUID.randomUUID();
+        List<Pair<Integer, Long>> latestOffsets = 
KafkaUtil.getLatestOffsets(id, taskId, brokerListSnapshot,
+                topicSnapshot, customPropertiesSnapshot, partitionIds);
+        updateLatestOffsetsCache(latestOffsets, taskId);
+    }
+
     @Override
     public String getLag() {
         Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) 
progress).getLag(cachedPartitionWithLatestOffsets);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 09d876ab2e0..8ff853e9b42 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -177,6 +177,66 @@ public class KafkaRoutineLoadJobTest {
         }
     }
 
+    @Test
+    public void testUpdateLagRefreshesLatestOffsetCache() throws UserException 
{
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+        partitionIdToOffset.put(1, 10L);
+        partitionIdToOffset.put(2, 20L);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+
+        try (MockedStatic<KafkaUtil> kafkaUtilStatic = 
Mockito.mockStatic(KafkaUtil.class)) {
+            kafkaUtilStatic.when(() -> 
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+                    Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"), 
Mockito.anyMap(), Mockito.anyList()))
+                    .thenReturn(Lists.newArrayList(Pair.of(1, 15L), Pair.of(2, 
30L)));
+
+            routineLoadJob.updateLag();
+
+            Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
+        }
+    }
+
+    @Test
+    public void testUpdateLagRebuildsConvertedPropertiesAfterReplay() throws 
UserException {
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Deencapsulation.setField(routineLoadJob, "customKafkaPartitions", 
Lists.newArrayList(1));
+
+        Map<String, String> customProperties = Maps.newHashMap();
+        customProperties.put("security.protocol", "SASL_PLAINTEXT");
+        customProperties.put("sasl.mechanism", "PLAIN");
+        Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+        Deencapsulation.setField(routineLoadJob, "convertedCustomProperties", 
Maps.newHashMap());
+
+        Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+        partitionIdToOffset.put(1, 10L);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+
+        Env env = Mockito.mock(Env.class);
+        try (MockedStatic<Env> envStatic = Mockito.mockStatic(Env.class);
+                MockedStatic<KafkaUtil> kafkaUtilStatic = 
Mockito.mockStatic(KafkaUtil.class)) {
+            envStatic.when(Env::getCurrentEnv).thenReturn(env);
+            kafkaUtilStatic.when(() -> 
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+                    Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"),
+                    Mockito.<Map<String, String>>argThat(properties ->
+                            
"SASL_PLAINTEXT".equals(properties.get("security.protocol"))
+                                    && 
"PLAIN".equals(properties.get("sasl.mechanism"))),
+                    Mockito.argThat(partitions -> partitions.size() == 1 && 
partitions.contains(1))))
+                    .thenReturn(Lists.newArrayList(Pair.of(1, 15L)));
+
+            routineLoadJob.updateLag();
+
+            Assert.assertEquals(5L, routineLoadJob.totalLag().longValue());
+            kafkaUtilStatic.verify(() -> 
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+                    Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"),
+                    Mockito.<Map<String, String>>argThat(properties ->
+                            
"SASL_PLAINTEXT".equals(properties.get("security.protocol"))
+                                    && 
"PLAIN".equals(properties.get("sasl.mechanism"))),
+                    Mockito.argThat(partitions -> partitions.size() == 1 && 
partitions.contains(1))));
+        }
+    }
+
     @Test
     public void 
testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws 
UserException {
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to