This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c84ad2d87a0 [enhance](job) timely rescheduling tasks to avoid write
jitter (#53853)
c84ad2d87a0 is described below
commit c84ad2d87a0c939b62aa792334ca921ac3eb4d7b
Author: hui lai <[email protected]>
AuthorDate: Tue Jul 29 22:57:25 2025 +0800
[enhance](job) timely rescheduling tasks to avoid write jitter (#53853)
### What problem does this PR solve?
If submit task fail(such as network jitter or restart BE node), the task
will be rescheduled after timeout which cause writing jitter. This pr
aim to timely rescheduling tasks after submit task fail to avoid write
jitter.
---
.../load/routineload/RoutineLoadTaskScheduler.java | 37 +++++--
.../test_routine_load_job_schedule.groovy | 118 +++++++++++++++++++++
2 files changed, 144 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 040ca103004..28e03567765 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@@ -228,23 +229,32 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
}
}
} catch (LoadException e) {
- // submit task failed (such as TOO_MANY_TASKS error), but txn has
already begun.
- // Here we will still set the ExecuteStartTime of this task, which
means
- // we "assume" that this task has been successfully submitted.
- // And this task will then be aborted because of a timeout.
- // In this way, we can prevent the entire job from being paused
due to submit errors,
- // and we can also relieve the pressure on BE by waiting for the
timeout period.
- LOG.warn("failed to submit routine load task {} to BE: {}, error:
{}",
- DebugUtil.printId(routineLoadTaskInfo.getId()),
- routineLoadTaskInfo.getBeId(), e.getMessage());
-
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
- // fall through to set ExecuteStartTime
+ handleSubmitTaskFailure(routineLoadTaskInfo, e.getMessage());
+ return;
}
// set the executeStartTimeMs of task
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
}
+ private void handleSubmitTaskFailure(RoutineLoadTaskInfo
routineLoadTaskInfo, String errorMsg) {
+ LOG.warn("failed to submit routine load task {} to BE: {}, error: {}",
+ DebugUtil.printId(routineLoadTaskInfo.getId()),
+ routineLoadTaskInfo.getBeId(), errorMsg);
+ routineLoadTaskInfo.setBeId(-1);
+ RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
+ routineLoadJob.setOtherMsg(errorMsg);
+
+ // Check if this is a resource pressure error that should not be
immediately rescheduled
+ if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+ return;
+ }
+
+ // for other errors (network issues, BE restart, etc.), reschedule
immediately
+ RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo);
+ addTaskInQueue(newTask);
+ }
+
private void updateBackendSlotIfNecessary() {
long currentTime = System.currentTimeMillis();
if (lastBackendSlotUpdateTime == -1
@@ -287,6 +297,11 @@ public class RoutineLoadTaskScheduler extends MasterDaemon
{
TStatus tStatus =
client.submitRoutineLoadTask(Lists.newArrayList(tTask));
ok = true;
+ if (DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED"))
{
+ LOG.warn("debug point FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED,
routine load task submit failed");
+ throw new LoadException("debug point
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED");
+ }
+
if (tStatus.getStatusCode() != TStatusCode.OK) {
throw new LoadException("failed to submit task. error code: "
+ tStatus.getStatusCode()
+ ", msg: " + (tStatus.getErrorMsgsSize() > 0 ?
tStatus.getErrorMsgs().get(0) : "NaN"));
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
new file mode 100644
index 00000000000..c892d9dbde0
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+import java.util.Collections
+
+suite("test_routine_load_job_schedule","nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_job_schedule",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+ def adminClient = AdminClient.create(props)
+ def newTopic = new NewTopic(kafkaCsvTpoics[0], 5, (short)1)
+ def testData = [
+ "1,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1",
+ "2,test_data_2,2023-01-02,value2,2023-01-02 11:00:00,extra2",
+ "3,test_data_3,2023-01-03,value3,2023-01-03 12:00:00,extra3",
+ "4,test_data_4,2023-01-04,value4,2023-01-04 13:00:00,extra4",
+ "5,test_data_5,2023-01-05,value5,2023-01-05 14:00:00,extra5"
+ ]
+ adminClient.createTopics(Collections.singletonList(newTopic))
+ testData.eachWithIndex { line, index ->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(newTopic.name(), index, null,
line)
+ producer.send(record)
+ }
+ producer.close()
+
+ def tableName = "test_routine_load_job_schedule"
+ def job = "test_routine_load_job_schedule"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${newTopic.name()}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sleep(5000)
+
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+ def count = 0
+ def maxWaitCount = 60
+ while (true) {
+ def state = sql "show routine load for ${job}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sql "select count(*) from ${tableName}"
+ if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
+ break
+ }
+ if (count > maxWaitCount) {
+ assertEquals(1, 2)
+ }
+ sleep(1000)
+ count++
+ }
+ } catch (Exception e) {
+ logger.error("Test failed with exception: ${e.message}")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED")
+ try {
+ sql "stop routine load for ${job}"
+ } catch (Exception e) {
+ logger.warn("Failed to stop routine load job: ${e.message}")
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]