This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 88feffd32d1 branch-4.0: [enhance](job) refresh routine load lag more 
timely (#63654) (#64283)
88feffd32d1 is described below

commit 88feffd32d1bfa2d67e2b25c3dbbaad9c8ed8cd6
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 11 11:17:35 2026 +0800

    branch-4.0: [enhance](job) refresh routine load lag more timely (#63654) 
(#64283)
    
    pick https://github.com/apache/doris/pull/63654
---
 .../load/routineload/KafkaRoutineLoadJob.java      | 87 ++++++++++++++++------
 .../doris/load/routineload/RoutineLoadJob.java     |  3 +
 .../doris/load/routineload/RoutineLoadManager.java | 14 ++++
 .../load/routineload/RoutineLoadScheduler.java     |  2 +
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 68 +++++++++++++++++
 5 files changed, 151 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 36e6faf5582..c338dbfeda4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -181,9 +181,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     @Override
     public void prepare() throws UserException {
-        // should reset converted properties each time the job being prepared.
-        // because the file info can be changed anytime.
-        convertCustomProperties(true);
+        writeLock();
+        try {
+            // should reset converted properties each time the job being 
prepared.
+            // because the file info can be changed anytime.
+            convertCustomProperties(true);
+        } finally {
+            writeUnlock();
+        }
     }
 
     private void convertCustomProperties(boolean rebuild) throws DdlException {
@@ -332,15 +337,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment 
attachment) {
         ((KafkaProgress) 
attachment.getProgress()).getOffsetByPartition().entrySet().stream()
-                .forEach(entity -> {
-                    if 
(cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
-                            && 
cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
-                        cachedPartitionWithLatestOffsets.put(entity.getKey(), 
entity.getValue() + 1);
-                    }
-                });
+                .forEach(entity -> 
cachedPartitionWithLatestOffsets.computeIfPresent(entity.getKey(),
+                        (partitionId, cachedOffset) -> Math.max(cachedOffset, 
entity.getValue() + 1)));
         this.progress.update(attachment);
     }
 
+    private void updateLatestOffsetsCache(List<Pair<Integer, Long>> 
latestOffsets, UUID taskId) {
+        for (Pair<Integer, Long> pair : latestOffsets) {
+            Long updatedOffset = 
cachedPartitionWithLatestOffsets.merge(pair.first, pair.second, Math::max);
+            if (updatedOffset > pair.second) {
+                LOG.warn("Kafka offset fallback. partition: {}, cache offset: 
{}"
+                            + " get latest offset: {}, task {}, job {}",
+                            pair.first, updatedOffset, pair.second, taskId, 
id);
+            }
+        }
+    }
+
     @Override
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         updateProgressAndOffsetsCache(attachment);
@@ -619,14 +631,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         super.setOptional(info);
         KafkaDataSourceProperties kafkaDataSourceProperties
                 = (KafkaDataSourceProperties) info.getDataSourceProperties();
-        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
-            setCustomKafkaPartitions(kafkaDataSourceProperties);
-        }
         if 
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
             
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
         }
         // set group id if not specified
         this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + 
UUID.randomUUID());
+        convertCustomProperties(true);
+        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
+            setCustomKafkaPartitions(kafkaDataSourceProperties);
+        }
     }
 
     // this is an unprotected method which is called in the initialization 
function
@@ -866,18 +879,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         try {
             // all offsets to be consumed are newer than offsets in 
cachedPartitionWithLatestOffsets,
             // maybe the cached offset is out-of-date, fetch from kafka server 
again
-            List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, getBrokerList(),
-                    getTopic(), getConvertedCustomProperties(), 
Lists.newArrayList(partitionIdToOffset.keySet()));
-            for (Pair<Integer, Long> pair : tmp) {
-                if (pair.second >= 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
-                    cachedPartitionWithLatestOffsets.put(pair.first, 
pair.second);
-                } else {
-                    LOG.warn("Kafka offset fallback. partition: {}, cache 
offset: {}"
-                                + " get latest offset: {}, task {}, job {}",
-                                pair.first, 
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
-                                pair.second, taskId, id);
-                }
+            String brokerListSnapshot;
+            String topicSnapshot;
+            Map<String, String> customPropertiesSnapshot;
+            writeLock();
+            try {
+                convertCustomProperties(false);
+                brokerListSnapshot = brokerList;
+                topicSnapshot = topic;
+                customPropertiesSnapshot = 
Maps.newHashMap(convertedCustomProperties);
+            } finally {
+                writeUnlock();
             }
+            List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, brokerListSnapshot,
+                    topicSnapshot, customPropertiesSnapshot, 
Lists.newArrayList(partitionIdToOffset.keySet()));
+            updateLatestOffsetsCache(tmp, taskId);
         } catch (Exception e) {
             // It needs to pause job when can not get partition meta.
             // To ensure the stability of the routine load,
@@ -919,6 +935,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return false;
     }
 
+    @Override
+    public void updateLag() throws UserException {
+        List<Integer> partitionIds;
+        String brokerListSnapshot;
+        String topicSnapshot;
+        Map<String, String> customPropertiesSnapshot;
+        writeLock();
+        try {
+            convertCustomProperties(false);
+            partitionIds = Lists.newArrayList(((KafkaProgress) 
progress).getOffsetByPartition().keySet());
+            if (partitionIds.isEmpty()) {
+                return;
+            }
+            brokerListSnapshot = brokerList;
+            topicSnapshot = topic;
+            customPropertiesSnapshot = 
Maps.newHashMap(convertedCustomProperties);
+        } finally {
+            writeUnlock();
+        }
+        UUID taskId = UUID.randomUUID();
+        List<Pair<Integer, Long>> latestOffsets = 
KafkaUtil.getLatestOffsets(id, taskId, brokerListSnapshot,
+                topicSnapshot, customPropertiesSnapshot, partitionIds);
+        updateLatestOffsetsCache(latestOffsets, taskId);
+    }
+
     @Override
     public String getLag() {
         Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) 
progress).getLag(cachedPartitionWithLatestOffsets);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 3a5d95c9442..37c9a6a1c52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1001,6 +1001,9 @@ public abstract class RoutineLoadJob
         return 0L;
     }
 
+    public void updateLag() throws UserException {
+    }
+
     abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo, boolean delaySchedule);
 
     // call before first scheduling
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 3b1f498bcfc..26f806a76ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -870,6 +870,20 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    public void updateRoutineLoadJobLag() {
+        for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
+            if (!routineLoadJob.state.isFinalState()) {
+                try {
+                    routineLoadJob.updateLag();
+                } catch (UserException e) {
+                    LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
+                            .add("msg", "failed to update routine load lag")
+                            .build(), e);
+                }
+            }
+        }
+    }
+
     public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
         unprotectedAddJob(routineLoadJob);
         LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
routineLoadJob.getId())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 023cd239e09..6fe09e27a0b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -62,6 +62,8 @@ public class RoutineLoadScheduler extends MasterDaemon {
     private void process() throws UserException {
         // update
         routineLoadManager.updateRoutineLoadJob();
+        // refresh lag cache after job progress and partition metadata are 
updated
+        routineLoadManager.updateRoutineLoadJobLag();
         // get need schedule routine jobs
         List<RoutineLoadJob> routineLoadJobList = null;
         try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 1d8b58257dc..5113c57ec88 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -203,6 +203,74 @@ public class KafkaRoutineLoadJobTest {
         }
     }
 
+    @Test
+    public void testUpdateLagRefreshesLatestOffsetCache() throws UserException 
{
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+        partitionIdToOffset.put(1, 10L);
+        partitionIdToOffset.put(2, 20L);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+
+        new MockUp<KafkaUtil>() {
+            @Mock
+            public List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID 
taskId, String brokerList, String topic,
+                                                              Map<String, 
String> convertedCustomProperties,
+                                                              List<Integer> 
partitionIds) {
+                Assert.assertEquals(1L, jobId);
+                Assert.assertEquals("127.0.0.1:9020", brokerList);
+                Assert.assertEquals("topic1", topic);
+                Assert.assertTrue(partitionIds.contains(1));
+                Assert.assertTrue(partitionIds.contains(2));
+                return Lists.newArrayList(Pair.of(1, 15L), Pair.of(2, 30L));
+            }
+        };
+
+        routineLoadJob.updateLag();
+
+        Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
+    }
+
+    @Test
+    public void testUpdateLagRebuildsConvertedPropertiesAfterReplay(@Mocked 
Env env) throws UserException {
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Deencapsulation.setField(routineLoadJob, "customKafkaPartitions", 
Lists.newArrayList(1));
+
+        Map<String, String> customProperties = Maps.newHashMap();
+        customProperties.put("security.protocol", "SASL_PLAINTEXT");
+        customProperties.put("sasl.mechanism", "PLAIN");
+        Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+        Deencapsulation.setField(routineLoadJob, "convertedCustomProperties", 
Maps.newHashMap());
+
+        Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+        partitionIdToOffset.put(1, 10L);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+
+        new MockUp<Env>() {
+            @Mock
+            public Env getCurrentEnv() {
+                return env;
+            }
+        };
+        new MockUp<KafkaUtil>() {
+            @Mock
+            public List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID 
taskId, String brokerList, String topic,
+                                                              Map<String, 
String> convertedCustomProperties,
+                                                              List<Integer> 
partitionIds) {
+                Assert.assertEquals("SASL_PLAINTEXT", 
convertedCustomProperties.get("security.protocol"));
+                Assert.assertEquals("PLAIN", 
convertedCustomProperties.get("sasl.mechanism"));
+                Assert.assertEquals(1, partitionIds.size());
+                Assert.assertTrue(partitionIds.contains(1));
+                return Lists.newArrayList(Pair.of(1, 15L));
+            }
+        };
+
+        routineLoadJob.updateLag();
+
+        Assert.assertEquals(5L, routineLoadJob.totalLag().longValue());
+    }
+
     @Test
     public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr 
globalTransactionMgr,
                                         @Injectable RoutineLoadManager 
routineLoadManager,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to