This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 31b6fbb3e50 [opt](job) delay Kafka read committed zero-row retries 
(#64046)
31b6fbb3e50 is described below

commit 31b6fbb3e50e25227901a74e3fd872adaa6e3d98
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 5 14:16:11 2026 +0800

    [opt](job) delay Kafka read committed zero-row retries (#64046)
    
    ### What problem does this PR solve?
    
    Kafka routine load with `isolation.level=read_committed` can finish a
    task with 0 consumed rows while the task still has positive lag, for
    example when upstream transactional records are not committed and
    therefore invisible. PR #63664 added an `OtherMsg` hint for this case,
    but the renewed task could still be scheduled immediately when the
    normal EOF heuristic did not apply, causing repeated retries. This
    change reuses the same read_committed zero-row lag detection to mark the
    next Kafka routine load task for delayed scheduling, so it follows the
    existing `max_batch_interval` delay path used by EOF tasks.
---
 .../load/routineload/RoutineLoadTaskInfo.java      |  9 ++++
 .../routineload/kafka/KafkaRoutineLoadJob.java     | 10 +++--
 .../load/routineload/kafka/KafkaTaskInfo.java      |  7 ++++
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 48 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 872f29e72ed..e812bf3740d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -158,6 +158,10 @@ public abstract class RoutineLoadTaskInfo {
         return isEof;
     }
 
+    public boolean isDelaySchedule() {
+        return delaySchedule;
+    }
+
     public boolean needDedalySchedule() {
         return delaySchedule || isEof;
     }
@@ -178,6 +182,7 @@ public abstract class RoutineLoadTaskInfo {
 
     public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
         judgeEof(rlTaskTxnCommitAttachment);
+        this.delaySchedule = shouldDelaySchedule(rlTaskTxnCommitAttachment);
     }
 
     private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) 
{
@@ -195,6 +200,10 @@ public abstract class RoutineLoadTaskInfo {
         }
     }
 
+    protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
+        return false;
+    }
+
     protected abstract TRoutineLoadTask createRoutineLoadTask() throws 
UserException;
 
     public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
index 037b468b53a..dcb683e9e22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
@@ -374,12 +374,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment 
attachment) {
-        if (DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
-                || (attachment.getTotalRows() == 0 && isReadCommitted() && 
hasPositiveLagForTask(attachment))) {
+        if (shouldDelayScheduleForReadCommittedZeroRowsWithLag(attachment)) {
             setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
         }
     }
 
+    boolean 
shouldDelayScheduleForReadCommittedZeroRowsWithLag(RLTaskTxnCommitAttachment 
attachment) {
+        return DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
+                || (attachment.getTotalRows() == 0 && isReadCommitted() && 
hasPositiveLagForTask(attachment));
+    }
+
     private boolean isReadCommitted() {
         return 
KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
     }
@@ -413,7 +417,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         // add new task
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
                 ((KafkaProgress) 
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), 
isMultiTable());
-        kafkaTaskInfo.setDelaySchedule(delaySchedule);
+        kafkaTaskInfo.setDelaySchedule(delaySchedule || 
oldKafkaTaskInfo.isDelaySchedule());
         // remove old task
         routineLoadTaskInfoList.remove(routineLoadTaskInfo);
         // add new task
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
index 4364c106529..21aec901571 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaTaskInfo.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadTaskInfo;
@@ -162,6 +163,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
     }
 
+    @Override
+    protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
+        KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) 
routineLoadManager.getJob(jobId);
+        return 
routineLoadJob.shouldDelayScheduleForReadCommittedZeroRowsWithLag(rlTaskTxnCommitAttachment);
+    }
+
     private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob) 
throws UserException {
         TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
         // plan for each task, in case table has change(rollup or schema 
change)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 8ff853e9b42..8b442012ff2 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -261,6 +261,54 @@ public class KafkaRoutineLoadJobTest {
         Assert.assertTrue(otherMsg.contains("some records may be in 
uncommitted transactions"));
     }
 
+    @Test
+    public void testReadCommittedZeroRowsWithLagDelaysNextTask() throws 
UserException {
+        RoutineLoadManager routineLoadManager = 
Mockito.mock(RoutineLoadManager.class);
+        Env env = Mockito.mock(Env.class);
+
+        try (MockedStatic<Env> envStatic = Mockito.mockStatic(Env.class)) {
+            envStatic.when(Env::getCurrentEnv).thenReturn(env);
+            
Mockito.when(env.getRoutineLoadManager()).thenReturn(routineLoadManager);
+
+            KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                    1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+            
Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+            Map<String, String> customProperties = Maps.newHashMap();
+            customProperties.put("isolation.level", "read_committed");
+            Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+
+            Map<Integer, Long> cachedPartitionWithLatestOffsets = 
Maps.newHashMap();
+            cachedPartitionWithLatestOffsets.put(1, 20L);
+            Deencapsulation.setField(routineLoadJob, 
"cachedPartitionWithLatestOffsets",
+                    cachedPartitionWithLatestOffsets);
+
+            Map<Integer, Long> taskProgress = Maps.newHashMap();
+            taskProgress.put(1, 10L);
+            Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(taskProgress));
+
+            KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                    taskProgress, false, 1000, false);
+            List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new 
ArrayList<>();
+            routineLoadTaskInfoList.add(kafkaTaskInfo);
+            Deencapsulation.setField(routineLoadJob, 
"routineLoadTaskInfoList", routineLoadTaskInfoList);
+
+            RLTaskTxnCommitAttachment attachment = new 
RLTaskTxnCommitAttachment();
+            Deencapsulation.setField(attachment, "progress", new 
KafkaProgress(taskProgress));
+            Deencapsulation.setField(attachment, "taskExecutionTimeMs",
+                    routineLoadJob.getMaxBatchIntervalS() * 1000);
+
+            kafkaTaskInfo.handleTaskByTxnCommitAttachment(attachment);
+
+            Assert.assertFalse(kafkaTaskInfo.getIsEof());
+            Assert.assertTrue(kafkaTaskInfo.needDedalySchedule());
+
+            RoutineLoadTaskInfo newTask = 
Deencapsulation.invoke(routineLoadJob,
+                    "unprotectRenewTask", kafkaTaskInfo, false);
+            Assert.assertTrue(newTask.needDedalySchedule());
+        }
+    }
+
     @Test
     public void testProcessTimeOutTasks() throws Exception {
         RoutineLoadManager routineLoadManager = 
Mockito.mock(RoutineLoadManager.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to