This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e30738bad9 [enhance](job) refresh routine load lag more timely
(#63654)
5e30738bad9 is described below
commit 5e30738bad9fea5962bb4b0a71729907be9c8348
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 3 23:39:31 2026 +0800
[enhance](job) refresh routine load lag more timely (#63654)
### What problem does this PR solve?
Problem Summary:
Routine load lag was refreshed mainly when task scheduling needed to
recheck latest offsets after consuming the cached end offset. If
producers continued appending data while the job was running, the cached
latest offsets could become stale, so the reported routine load lag was
not real-time enough.
This PR refreshes routine load lag cache during `RoutineLoadScheduler`
rounds. The metric path still only reads in-memory state and does not
call Kafka directly.
For routine load jobs, the latest offset cache is refreshed for current
progress partitions. Concurrent updates from job scheduling and task
scheduling are handled with monotonic atomic max updates, so latest
offsets do not regress. Kafka metadata requests also use snapshots of
broker/topic/converted properties.
---
.../doris/load/routineload/RoutineLoadJob.java | 3 +
.../doris/load/routineload/RoutineLoadManager.java | 14 ++++
.../load/routineload/RoutineLoadScheduler.java | 2 +
.../routineload/kafka/KafkaRoutineLoadJob.java | 87 ++++++++++++++++------
.../load/routineload/KafkaRoutineLoadJobTest.java | 60 +++++++++++++++
5 files changed, 143 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 82cd9325dfa..3223ff91359 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1001,6 +1001,9 @@ public abstract class RoutineLoadJob
return 0L;
}
+ public void updateLag() throws UserException {
+ }
+
protected abstract RoutineLoadTaskInfo unprotectRenewTask(
RoutineLoadTaskInfo routineLoadTaskInfo, boolean delaySchedule);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index f24b9ad2252..904fa207ac3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -875,6 +875,20 @@ public class RoutineLoadManager implements Writable {
}
}
+ public void updateRoutineLoadJobLag() {
+ for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
+ if (!routineLoadJob.state.isFinalState()) {
+ try {
+ routineLoadJob.updateLag();
+ } catch (UserException e) {
+ LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
+ .add("msg", "failed to update routine load lag")
+ .build(), e);
+ }
+ }
+ }
+ }
+
public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
unprotectedAddJob(routineLoadJob);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 023cd239e09..6fe09e27a0b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -62,6 +62,8 @@ public class RoutineLoadScheduler extends MasterDaemon {
private void process() throws UserException {
// update
routineLoadManager.updateRoutineLoadJob();
+ // refresh lag cache after job progress and partition metadata are
updated
+ routineLoadManager.updateRoutineLoadJobLag();
// get need schedule routine jobs
List<RoutineLoadJob> routineLoadJobList = null;
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
index e1290b03996..037b468b53a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
@@ -194,9 +194,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
public void prepare() throws UserException {
- // should reset converted properties each time the job being prepared.
- // because the file info can be changed anytime.
- convertCustomProperties(true);
+ writeLock();
+ try {
+ // should reset converted properties each time the job being
prepared.
+ // because the file info can be changed anytime.
+ convertCustomProperties(true);
+ } finally {
+ writeUnlock();
+ }
}
private void convertCustomProperties(boolean rebuild) throws DdlException {
@@ -345,15 +350,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment
attachment) {
((KafkaProgress)
attachment.getProgress()).getOffsetByPartition().entrySet().stream()
- .forEach(entity -> {
- if
(cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
- &&
cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
- cachedPartitionWithLatestOffsets.put(entity.getKey(),
entity.getValue() + 1);
- }
- });
+ .forEach(entity ->
cachedPartitionWithLatestOffsets.computeIfPresent(entity.getKey(),
+ (partitionId, cachedOffset) -> Math.max(cachedOffset,
entity.getValue() + 1)));
this.progress.update(attachment);
}
+ private void updateLatestOffsetsCache(List<Pair<Integer, Long>>
latestOffsets, UUID taskId) {
+ for (Pair<Integer, Long> pair : latestOffsets) {
+ Long updatedOffset =
cachedPartitionWithLatestOffsets.merge(pair.first, pair.second, Math::max);
+ if (updatedOffset > pair.second) {
+ LOG.warn("Kafka offset fallback. partition: {}, cache offset:
{}"
+ + " get latest offset: {}, task {}, job {}",
+ pair.first, updatedOffset, pair.second, taskId,
id);
+ }
+ }
+ }
+
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
updateProgressAndOffsetsCache(attachment);
@@ -655,14 +667,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
super.setOptional(info);
KafkaDataSourceProperties kafkaDataSourceProperties
= (KafkaDataSourceProperties) info.getDataSourceProperties();
- if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
- setCustomKafkaPartitions(kafkaDataSourceProperties);
- }
if
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
}
// set group id if not specified
this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" +
UUID.randomUUID());
+ convertCustomProperties(true);
+ if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
+ setCustomKafkaPartitions(kafkaDataSourceProperties);
+ }
}
// this is an unprotected method which is called in the initialization
function
@@ -902,18 +915,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
try {
// all offsets to be consumed are newer than offsets in
cachedPartitionWithLatestOffsets,
// maybe the cached offset is out-of-date, fetch from kafka server
again
- List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, getBrokerList(),
- getTopic(), getConvertedCustomProperties(),
Lists.newArrayList(partitionIdToOffset.keySet()));
- for (Pair<Integer, Long> pair : tmp) {
- if (pair.second >=
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
- cachedPartitionWithLatestOffsets.put(pair.first,
pair.second);
- } else {
- LOG.warn("Kafka offset fallback. partition: {}, cache
offset: {}"
- + " get latest offset: {}, task {}, job {}",
- pair.first,
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
- pair.second, taskId, id);
- }
+ String brokerListSnapshot;
+ String topicSnapshot;
+ Map<String, String> customPropertiesSnapshot;
+ writeLock();
+ try {
+ convertCustomProperties(false);
+ brokerListSnapshot = brokerList;
+ topicSnapshot = topic;
+ customPropertiesSnapshot =
Maps.newHashMap(convertedCustomProperties);
+ } finally {
+ writeUnlock();
}
+ List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, brokerListSnapshot,
+ topicSnapshot, customPropertiesSnapshot,
Lists.newArrayList(partitionIdToOffset.keySet()));
+ updateLatestOffsetsCache(tmp, taskId);
} catch (Exception e) {
// It needs to pause job when can not get partition meta.
// To ensure the stability of the routine load,
@@ -955,6 +971,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return false;
}
+ @Override
+ public void updateLag() throws UserException {
+ List<Integer> partitionIds;
+ String brokerListSnapshot;
+ String topicSnapshot;
+ Map<String, String> customPropertiesSnapshot;
+ writeLock();
+ try {
+ convertCustomProperties(false);
+ partitionIds = Lists.newArrayList(((KafkaProgress)
progress).getOffsetByPartition().keySet());
+ if (partitionIds.isEmpty()) {
+ return;
+ }
+ brokerListSnapshot = brokerList;
+ topicSnapshot = topic;
+ customPropertiesSnapshot =
Maps.newHashMap(convertedCustomProperties);
+ } finally {
+ writeUnlock();
+ }
+ UUID taskId = UUID.randomUUID();
+ List<Pair<Integer, Long>> latestOffsets =
KafkaUtil.getLatestOffsets(id, taskId, brokerListSnapshot,
+ topicSnapshot, customPropertiesSnapshot, partitionIds);
+ updateLatestOffsetsCache(latestOffsets, taskId);
+ }
+
@Override
public String getLag() {
Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress)
progress).getLag(cachedPartitionWithLatestOffsets);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 09d876ab2e0..8ff853e9b42 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -177,6 +177,66 @@ public class KafkaRoutineLoadJobTest {
}
}
+ @Test
+ public void testUpdateLagRefreshesLatestOffsetCache() throws UserException
{
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+ partitionIdToOffset.put(1, 10L);
+ partitionIdToOffset.put(2, 20L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+
+ try (MockedStatic<KafkaUtil> kafkaUtilStatic =
Mockito.mockStatic(KafkaUtil.class)) {
+ kafkaUtilStatic.when(() ->
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+ Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"),
Mockito.anyMap(), Mockito.anyList()))
+ .thenReturn(Lists.newArrayList(Pair.of(1, 15L), Pair.of(2,
30L)));
+
+ routineLoadJob.updateLag();
+
+ Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
+ }
+ }
+
+ @Test
+ public void testUpdateLagRebuildsConvertedPropertiesAfterReplay() throws
UserException {
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ Deencapsulation.setField(routineLoadJob, "customKafkaPartitions",
Lists.newArrayList(1));
+
+ Map<String, String> customProperties = Maps.newHashMap();
+ customProperties.put("security.protocol", "SASL_PLAINTEXT");
+ customProperties.put("sasl.mechanism", "PLAIN");
+ Deencapsulation.setField(routineLoadJob, "customProperties",
customProperties);
+ Deencapsulation.setField(routineLoadJob, "convertedCustomProperties",
Maps.newHashMap());
+
+ Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+ partitionIdToOffset.put(1, 10L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+
+ Env env = Mockito.mock(Env.class);
+ try (MockedStatic<Env> envStatic = Mockito.mockStatic(Env.class);
+ MockedStatic<KafkaUtil> kafkaUtilStatic =
Mockito.mockStatic(KafkaUtil.class)) {
+ envStatic.when(Env::getCurrentEnv).thenReturn(env);
+ kafkaUtilStatic.when(() ->
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+ Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"),
+ Mockito.<Map<String, String>>argThat(properties ->
+
"SASL_PLAINTEXT".equals(properties.get("security.protocol"))
+ &&
"PLAIN".equals(properties.get("sasl.mechanism"))),
+ Mockito.argThat(partitions -> partitions.size() == 1 &&
partitions.contains(1))))
+ .thenReturn(Lists.newArrayList(Pair.of(1, 15L)));
+
+ routineLoadJob.updateLag();
+
+ Assert.assertEquals(5L, routineLoadJob.totalLag().longValue());
+ kafkaUtilStatic.verify(() ->
KafkaUtil.getLatestOffsets(Mockito.eq(1L), Mockito.any(UUID.class),
+ Mockito.eq("127.0.0.1:9020"), Mockito.eq("topic1"),
+ Mockito.<Map<String, String>>argThat(properties ->
+
"SASL_PLAINTEXT".equals(properties.get("security.protocol"))
+ &&
"PLAIN".equals(properties.get("sasl.mechanism"))),
+ Mockito.argThat(partitions -> partitions.size() == 1 &&
partitions.contains(1))));
+ }
+ }
+
@Test
public void
testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws
UserException {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]