This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 182820f0235 branch-4.0: [enhance](job) add zero-row hint for Kafka 
read_committed load and [opt](job) delay Kafka read committed zero-row retries 
(#64584)
182820f0235 is described below

commit 182820f02351db2f5f1d0d8c5b1e7b7a5e012b71
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 18 23:40:39 2026 +0800

    branch-4.0: [enhance](job) add zero-row hint for Kafka read_committed load 
and [opt](job) delay Kafka read committed zero-row retries (#64584)
    
    pick https://github.com/apache/doris/pull/63664; pick
    https://github.com/apache/doris/pull/64046
---
 .../load/routineload/KafkaRoutineLoadJob.java      |  36 +++++-
 .../doris/load/routineload/KafkaTaskInfo.java      |   6 +
 .../load/routineload/RoutineLoadTaskInfo.java      |  11 +-
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  76 +++++++++++
 .../test_routine_load_error_info.groovy            | 141 ++++++++++++++-------
 5 files changed, 220 insertions(+), 50 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c338dbfeda4..67756bc2b43 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -89,6 +90,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     public static final String KAFKA_FILE_CATALOG = "kafka";
     public static final String PROP_GROUP_ID = "group.id";
+    private static final String KAFKA_ISOLATION_LEVEL = "isolation.level";
+    private static final String KAFKA_READ_COMMITTED = "read_committed";
+    private static final String HAS_POSITIVE_LAG_DEBUG_POINT = 
"KafkaRoutineLoadJob.hasPositiveLagForTask";
+    private static final String READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE = 
"Kafka routine load consumed 0 rows "
+            + "while lag is still positive under 
isolation.level=read_committed. If the upstream producer uses "
+            + "Kafka transactions, some records may be in uncommitted 
transactions and are not visible yet.";
 
     @SerializedName("bl")
     private String brokerList;
@@ -357,6 +364,33 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         updateProgressAndOffsetsCache(attachment);
         super.updateProgress(attachment);
+        updateReadCommittedLagHint(attachment);
+    }
+
+    private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment 
attachment) {
+        if (shouldDelayScheduleForReadCommittedZeroRowsWithLag(attachment)) {
+            setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
+        }
+    }
+
+    boolean 
shouldDelayScheduleForReadCommittedZeroRowsWithLag(RLTaskTxnCommitAttachment 
attachment) {
+        return DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
+                || (attachment.getTotalRows() == 0 && isReadCommitted() && 
hasPositiveLagForTask(attachment));
+    }
+
+    private boolean isReadCommitted() {
+        return 
KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
+    }
+
+    private boolean hasPositiveLagForTask(RLTaskTxnCommitAttachment 
attachment) {
+        Map<Integer, Long> partitionIdToOffset = ((KafkaProgress) 
attachment.getProgress()).getOffsetByPartition();
+        for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+            Long latestOffset = 
cachedPartitionWithLatestOffsets.get(entry.getKey());
+            if (latestOffset != null && latestOffset > entry.getValue() + 1) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
@@ -377,7 +411,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         // add new task
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
                 ((KafkaProgress) 
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), 
isMultiTable());
-        kafkaTaskInfo.setDelaySchedule(delaySchedule);
+        kafkaTaskInfo.setDelaySchedule(delaySchedule || 
oldKafkaTaskInfo.isDelaySchedule());
         // remove old task
         routineLoadTaskInfoList.remove(routineLoadTaskInfo);
         // add new task
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 5e5c7158b71..eec36a5af9b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -159,6 +159,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
     }
 
+    @Override
+    protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
+        KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) 
routineLoadManager.getJob(jobId);
+        return 
routineLoadJob.shouldDelayScheduleForReadCommittedZeroRowsWithLag(rlTaskTxnCommitAttachment);
+    }
+
     private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob) 
throws UserException {
         TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
         // plan for each task, in case table has change(rollup or schema 
change)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index efdfb4586db..6e03ff01841 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -158,6 +158,10 @@ public abstract class RoutineLoadTaskInfo {
         return isEof;
     }
 
+    public boolean isDelaySchedule() {
+        return delaySchedule;
+    }
+
     public boolean needDedalySchedule() {
         return delaySchedule || isEof;
     }
@@ -178,6 +182,7 @@ public abstract class RoutineLoadTaskInfo {
 
     public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
         judgeEof(rlTaskTxnCommitAttachment);
+        this.delaySchedule = shouldDelaySchedule(rlTaskTxnCommitAttachment);
     }
 
     private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) 
{
@@ -195,7 +200,11 @@ public abstract class RoutineLoadTaskInfo {
         }
     }
 
-    abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
+    protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment) {
+        return false;
+    }
+
+    protected abstract TRoutineLoadTask createRoutineLoadTask() throws 
UserException;
 
     public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 5113c57ec88..b6944df5539 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.jmockit.FieldReflection;
 import org.apache.doris.datasource.kafka.KafkaUtil;
 import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -231,6 +232,81 @@ public class KafkaRoutineLoadJobTest {
         Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
     }
 
+    @Test
+    public void 
testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws 
UserException {
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Map<String, String> customProperties = Maps.newHashMap();
+        customProperties.put("isolation.level", "read_committed");
+        Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+
+        Map<Integer, Long> cachedPartitionWithLatestOffsets = 
Maps.newHashMap();
+        cachedPartitionWithLatestOffsets.put(1, 20L);
+        Deencapsulation.setField(routineLoadJob, 
"cachedPartitionWithLatestOffsets",
+                cachedPartitionWithLatestOffsets);
+
+        Map<Integer, Long> taskProgress = Maps.newHashMap();
+        taskProgress.put(1, 10L);
+        RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+        Deencapsulation.setField(attachment, "progress", new 
KafkaProgress(taskProgress));
+
+        Deencapsulation.invoke(routineLoadJob, "updateProgress", attachment);
+
+        String otherMsg = Deencapsulation.getField(routineLoadJob, "otherMsg");
+        Assert.assertTrue(otherMsg.contains("some records may be in 
uncommitted transactions"));
+    }
+
+    @Test
+    public void testReadCommittedZeroRowsWithLagDelaysNextTask(
+            @Injectable RoutineLoadManager routineLoadManager) throws 
UserException {
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        new Expectations() {
+            {
+                routineLoadManager.getJob(1L);
+                minTimes = 0;
+                result = routineLoadJob;
+            }
+        };
+
+        Map<String, String> customProperties = Maps.newHashMap();
+        customProperties.put("isolation.level", "read_committed");
+        Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+
+        Map<Integer, Long> cachedPartitionWithLatestOffsets = 
Maps.newHashMap();
+        cachedPartitionWithLatestOffsets.put(1, 20L);
+        Deencapsulation.setField(routineLoadJob, 
"cachedPartitionWithLatestOffsets",
+                cachedPartitionWithLatestOffsets);
+
+        Map<Integer, Long> taskProgress = Maps.newHashMap();
+        taskProgress.put(1, 10L);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(taskProgress));
+
+        KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, 
20000,
+                taskProgress, false, 1000, false);
+        FieldReflection.setField(RoutineLoadTaskInfo.class, kafkaTaskInfo, 
"routineLoadManager",
+                routineLoadManager);
+        FieldReflection.setField(KafkaTaskInfo.class, kafkaTaskInfo, 
"routineLoadManager",
+                routineLoadManager);
+        List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
+        routineLoadTaskInfoList.add(kafkaTaskInfo);
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", 
routineLoadTaskInfoList);
+
+        RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+        Deencapsulation.setField(attachment, "progress", new 
KafkaProgress(taskProgress));
+        Deencapsulation.setField(attachment, "taskExecutionTimeMs",
+                routineLoadJob.getMaxBatchIntervalS() * 1000);
+
+        kafkaTaskInfo.handleTaskByTxnCommitAttachment(attachment);
+
+        Assert.assertFalse(kafkaTaskInfo.getIsEof());
+        Assert.assertTrue(kafkaTaskInfo.needDedalySchedule());
+
+        RoutineLoadTaskInfo newTask = Deencapsulation.invoke(routineLoadJob,
+                "unprotectRenewTask", kafkaTaskInfo, false);
+        Assert.assertTrue(newTask.needDedalySchedule());
+    }
+
     @Test
     public void testUpdateLagRebuildsConvertedPropertiesAfterReplay(@Mocked 
Env env) throws UserException {
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2715b8a45ca..260824a05eb 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.apache.doris.regression.util.RoutineLoadTestUtils
 import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.producer.ProducerConfig
 
 suite("test_routine_load_error_info","nonConcurrent") {
@@ -30,52 +30,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
                   "test_error_info",
                 ]
 
-    String enabled = context.config.otherConfigs.get("enableKafkaTest")
-    String kafka_port = context.config.otherConfigs.get("kafka_port")
-    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
-    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    def kafkaTestEnabled = RoutineLoadTestUtils.isKafkaTestEnabled(context)
+    def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
 
     // send data to kafka 
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        def props = new Properties()
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-        // add timeout config
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
-
-        // check conenction
-        def verifyKafkaConnection = { prod ->
-            try {
-                logger.info("=====try to connect Kafka========")
-                def partitions = 
prod.partitionsFor("__connection_verification_topic")
-                return partitions != null
-            } catch (Exception e) {
-                throw new Exception("Kafka connect fail: 
${e.message}".toString())
-            }
-        }
-        // Create kafka producer
-        def producer = new KafkaProducer<>(props)
+    if (kafkaTestEnabled) {
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
         try {
-            logger.info("Kafka connecting: ${kafka_broker}")
-            if (!verifyKafkaConnection(producer)) {
-                throw new Exception("can't get any kafka info")
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
[kafkaCsvTopic], txt.readLines())
             }
-        } catch (Exception e) {
-            logger.error("FATAL: " + e.getMessage())
+        } finally {
             producer.close()
-            throw e  
-        }
-        logger.info("Kafka connect success")
-        for (String kafkaCsvTopic in kafkaCsvTpoics) {
-            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
-            def lines = txt.readLines()
-            lines.each { line ->
-                logger.info("=====${line}========")
-                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
-                producer.send(record)
-            }
         }
     }
 
@@ -165,15 +132,47 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 )
                 FROM KAFKA
                 (
-                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_broker_list" = "${kafka_broker}",
                     "kafka_topic" = "${kafkaTopic}",
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
         """
     }
 
+    def createReadCommittedJob = {jobName, tableName, kafkaTopic ->
+        sql """
+        CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
+                    "property.isolation.level" = "read_committed"
+                );
+        """
+    }
+
+    def createKafkaTopic = {kafkaTopic ->
+        def adminProps = new Properties()
+        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        def adminClient = AdminClient.create(adminProps)
+        try {
+            adminClient.createTopics([new NewTopic(kafkaTopic, 1, (short) 
1)]).all().get()
+        } finally {
+            adminClient.close()
+        }
+    }
+
     // case 1: task failed
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         // create table
         def jobName = "test_error_info"
         def tableName = "test_routine_error_info"
@@ -209,7 +208,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
     }
 
     // case 2: reschedule job
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         def jobName = "test_error_info"
         def tableName = "test_routine_error_info"
         try {
@@ -242,7 +241,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
     }
 
     // case 3: memory limit
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         def jobName = "test_memory_limit_error_info"
         def tableName = "test_routine_memory_limit_error_info"
         
@@ -276,4 +275,50 @@ suite("test_routine_load_error_info","nonConcurrent") {
             sql "DROP TABLE IF EXISTS ${tableName}"
         }
     }
-}
\ No newline at end of file
+
+    // case 4: read_committed lag hint
+    if (kafkaTestEnabled) {
+        def jobName = "test_read_committed_lag_error_info"
+        def tableName = "test_routine_read_committed_lag_error_info"
+        def kafkaTopic = 
"test_read_committed_lag_error_info_${System.currentTimeMillis()}"
+        def debugPoint = "KafkaRoutineLoadJob.hasPositiveLagForTask"
+
+        try {
+            createKafkaTopic(kafkaTopic)
+            def producer = 
RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+            try {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").text
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
[kafkaTopic], txt.readLines())
+            } finally {
+                producer.close()
+            }
+            createTable(tableName)
+            sql "sync"
+            GetDebugPoint().enableDebugPointForAllFEs(debugPoint)
+            createReadCommittedJob(jobName, tableName, kafkaTopic)
+            sql "sync"
+
+            // check error info
+            def count = 0
+            while (true) {
+                def res = sql "show routine load for ${jobName}"
+                log.info("show routine load: ${res[0].toString()}".toString())
+                log.info("other msg: ${res[0][19].toString()}".toString())
+                if (res[0][19].toString() != "") {
+                    assertTrue(res[0][19].toString().contains("some records 
may be in uncommitted transactions"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                }
+                sleep(1000)
+            }
+        } finally {
+            GetDebugPoint().disableDebugPointForAllFEs(debugPoint)
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to