This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f34b8c1a1b7 [enhance](job) support adaptive batch param for routine 
load job (#56930)
f34b8c1a1b7 is described below

commit f34b8c1a1b7c15111b34288837fade4d0f5faa58
Author: hui lai <[email protected]>
AuthorDate: Mon Oct 20 15:08:33 2025 +0800

    [enhance](job) support adaptive batch param for routine load job (#56930)
    
    ### What problem does this PR solve?
    
    Users may set the `max batch interval` relatively small for visibility,
    which may result in insufficient throughput and data backlog when
    traffic is high. We propose an adaptive max batch interval scheme aimed
    at **prioritizing throughput over visibility during data backlog**
---
 .../main/java/org/apache/doris/common/Config.java  |  6 ++
 .../doris/load/routineload/KafkaTaskInfo.java      | 21 ++++-
 .../load/routineload/RoutineLoadTaskInfo.java      |  2 +-
 .../test_routine_load_adaptive_param.groovy        | 91 ++++++++++++++++++++++
 4 files changed, 116 insertions(+), 4 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d1b95e4af1c..32d96eeb06b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1243,6 +1243,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int routine_load_blacklist_expire_time_second = 300;
 
+    /**
+     * Minimum batch interval for adaptive routine load tasks when not at EOF.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int routine_load_adaptive_min_batch_interval_sec = 360;
+
     /**
      * The max number of files store in SmallFileMgr
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index a3d87ccef8b..32ccf96fc2f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -107,9 +107,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         } else {
             
Env.getCurrentEnv().getRoutineLoadManager().addMultiLoadTaskTxnIdToRoutineLoadJobId(txnId,
 jobId);
         }
-        
tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
-        tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
-        
tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes());
+        adaptiveBatchParam(tRoutineLoadTask, routineLoadJob);
         if (!routineLoadJob.getFormat().isEmpty() && 
routineLoadJob.getFormat().equalsIgnoreCase("json")) {
             tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_JSON);
         } else {
@@ -121,6 +119,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return tRoutineLoadTask;
     }
 
+    private void adaptiveBatchParam(TRoutineLoadTask tRoutineLoadTask, 
RoutineLoadJob routineLoadJob) {
+        long maxBatchIntervalS = routineLoadJob.getMaxBatchIntervalS();
+        long maxBatchRows = routineLoadJob.getMaxBatchRows();
+        long maxBatchSize = routineLoadJob.getMaxBatchSizeBytes();
+        if (!isEof) {
+            maxBatchIntervalS = Math.max(maxBatchIntervalS, 
Config.routine_load_adaptive_min_batch_interval_sec);
+            maxBatchRows = Math.max(maxBatchRows, 
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS);
+            maxBatchSize = Math.max(maxBatchSize, 
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE);
+            this.timeoutMs = maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier * 1000;
+        } else {
+            this.timeoutMs = routineLoadJob.getTimeout() * 1000;
+        }
+        tRoutineLoadTask.setMaxIntervalS(maxBatchIntervalS);
+        tRoutineLoadTask.setMaxBatchRows(maxBatchRows);
+        tRoutineLoadTask.setMaxBatchSize(maxBatchSize);
+    }
+
     @Override
     protected String getTaskDataSourceProperties() {
         Gson gson = new Gson();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 2708bbb6ebe..aa8c2ccc63c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,7 +189,7 @@ public abstract class RoutineLoadTaskInfo {
         RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
         if (rlTaskTxnCommitAttachment.getTotalRows() < 
routineLoadJob.getMaxBatchRows()
                 && rlTaskTxnCommitAttachment.getReceivedBytes() < 
routineLoadJob.getMaxBatchSizeBytes()
-                && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < 
this.timeoutMs) {
+                && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < 
routineLoadJob.getMaxBatchIntervalS() * 1000) {
             this.isEof = true;
         } else {
             this.isEof = false;
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
new file mode 100644
index 00000000000..31ab6278df1
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_adaptive_param","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "test_routine_load_adaptive_param",
+                ]
+
+    if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        def runSql = { String q -> sql q }
+        def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+        def tableName = "test_routine_load_adaptive_param"
+        def job = "test_adaptive_param"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${job} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "max_batch_interval" = "10"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def injection = "RoutineLoadTaskInfo.judgeEof"
+            try {
+                GetDebugPoint().enableDebugPointForAllFEs(injection)
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
kafkaCsvTpoics)
+                RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 
0)
+            } finally {
+                GetDebugPoint().disableDebugPointForAllFEs(injection)
+            }
+            // test adaptively increase
+            logger.info("---test adaptively increase---")
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+            RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 2)
+
+            // test restore adaptively
+            logger.info("---test restore adaptively---")
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 4)
+            RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "100")
+        } finally {
+            sql "stop routine load for ${job}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to