This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f34b8c1a1b7 [enhance](job) support adaptive batch param for routine
load job (#56930)
f34b8c1a1b7 is described below
commit f34b8c1a1b7c15111b34288837fade4d0f5faa58
Author: hui lai <[email protected]>
AuthorDate: Mon Oct 20 15:08:33 2025 +0800
[enhance](job) support adaptive batch param for routine load job (#56930)
### What problem does this PR solve?
Users may set the `max batch interval` relatively small for visibility,
which may result in insufficient throughput and data backlog when
traffic is high. We propose an adaptive max batch interval scheme aimed
at **prioritizing throughput over visibility during data backlog**
---
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../doris/load/routineload/KafkaTaskInfo.java | 21 ++++-
.../load/routineload/RoutineLoadTaskInfo.java | 2 +-
.../test_routine_load_adaptive_param.groovy | 91 ++++++++++++++++++++++
4 files changed, 116 insertions(+), 4 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d1b95e4af1c..32d96eeb06b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1243,6 +1243,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int routine_load_blacklist_expire_time_second = 300;
+ /**
+ * Minimum batch interval for adaptive routine load tasks when not at EOF.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int routine_load_adaptive_min_batch_interval_sec = 360;
+
/**
* The max number of files store in SmallFileMgr
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index a3d87ccef8b..32ccf96fc2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -107,9 +107,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
} else {
Env.getCurrentEnv().getRoutineLoadManager().addMultiLoadTaskTxnIdToRoutineLoadJobId(txnId,
jobId);
}
-
tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
- tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
-
tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes());
+ adaptiveBatchParam(tRoutineLoadTask, routineLoadJob);
if (!routineLoadJob.getFormat().isEmpty() &&
routineLoadJob.getFormat().equalsIgnoreCase("json")) {
tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_JSON);
} else {
@@ -121,6 +119,23 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return tRoutineLoadTask;
}
+ private void adaptiveBatchParam(TRoutineLoadTask tRoutineLoadTask,
RoutineLoadJob routineLoadJob) {
+ long maxBatchIntervalS = routineLoadJob.getMaxBatchIntervalS();
+ long maxBatchRows = routineLoadJob.getMaxBatchRows();
+ long maxBatchSize = routineLoadJob.getMaxBatchSizeBytes();
+ if (!isEof) {
+ maxBatchIntervalS = Math.max(maxBatchIntervalS,
Config.routine_load_adaptive_min_batch_interval_sec);
+ maxBatchRows = Math.max(maxBatchRows,
RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS);
+ maxBatchSize = Math.max(maxBatchSize,
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE);
+ this.timeoutMs = maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier * 1000;
+ } else {
+ this.timeoutMs = routineLoadJob.getTimeout() * 1000;
+ }
+ tRoutineLoadTask.setMaxIntervalS(maxBatchIntervalS);
+ tRoutineLoadTask.setMaxBatchRows(maxBatchRows);
+ tRoutineLoadTask.setMaxBatchSize(maxBatchSize);
+ }
+
@Override
protected String getTaskDataSourceProperties() {
Gson gson = new Gson();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 2708bbb6ebe..aa8c2ccc63c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -189,7 +189,7 @@ public abstract class RoutineLoadTaskInfo {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
if (rlTaskTxnCommitAttachment.getTotalRows() <
routineLoadJob.getMaxBatchRows()
&& rlTaskTxnCommitAttachment.getReceivedBytes() <
routineLoadJob.getMaxBatchSizeBytes()
- && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() <
this.timeoutMs) {
+ && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() <
routineLoadJob.getMaxBatchIntervalS() * 1000) {
this.isEof = true;
} else {
this.isEof = false;
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
new file mode 100644
index 00000000000..31ab6278df1
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_adaptive_param","nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_adaptive_param",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_adaptive_param"
+ def job = "test_adaptive_param"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "10"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def injection = "RoutineLoadTaskInfo.judgeEof"
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs(injection)
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName,
0)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(injection)
+ }
+ // test adaptively increase
+ logger.info("---test adaptively increase---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 2)
+
+ // test restore adaptively
+ logger.info("---test restore adaptively---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 4)
+ RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "100")
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]