This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 845e74bbe87 branch-3.0: [improve](routine load) improve routine load
observability #46238 (#46567)
845e74bbe87 is described below
commit 845e74bbe87aff067277a8e9e2a418c8884c9b80
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 13 21:52:28 2025 +0800
branch-3.0: [improve](routine load) improve routine load observability
#46238 (#46567)
Cherry-picked from #46238
Co-authored-by: hui lai <[email protected]>
---
.../doris/load/routineload/RoutineLoadJob.java | 15 +-
.../load_p0/routine_load/data/test_error_info.csv | 1 +
.../test_routine_load_error_info.groovy | 290 +++++++++++++++++++++
3 files changed, 303 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index eb4f6b99143..8de1e555140 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -503,7 +503,12 @@ public abstract class RoutineLoadJob
}
public void setOtherMsg(String otherMsg) {
- this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" +
Strings.nullToEmpty(otherMsg);
+ writeLock();
+ try {
+ this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" +
Strings.nullToEmpty(otherMsg);
+ } finally {
+ writeUnlock();
+ }
}
public String getDbFullName() throws MetaNotFoundException {
@@ -920,9 +925,10 @@ public abstract class RoutineLoadJob
+ "when current total rows is more than base
or the filter ratio is more than the max")
.build());
}
- // reset currentTotalNum and currentErrorNum
+ // reset currentTotalNum, currentErrorNum and otherMsg
this.jobStatistic.currentErrorRows = 0;
this.jobStatistic.currentTotalRows = 0;
+ this.otherMsg = "";
} else if (this.jobStatistic.currentErrorRows > maxErrorNum
|| (this.jobStatistic.currentTotalRows > 0
&& ((double) this.jobStatistic.currentErrorRows
@@ -941,9 +947,10 @@ public abstract class RoutineLoadJob
"current error rows is more than max_error_number "
+ "or the max_filter_ratio is more than the value
set"), isReplay);
}
- // reset currentTotalNum and currentErrorNum
+ // reset currentTotalNum, currentErrorNum and otherMsg
this.jobStatistic.currentErrorRows = 0;
this.jobStatistic.currentTotalRows = 0;
+ this.otherMsg = "";
}
}
@@ -1202,6 +1209,7 @@ public abstract class RoutineLoadJob
long taskBeId = -1L;
try {
this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
+ setOtherMsg(txnStatusChangeReasonString);
if (txnOperated) {
// step0: find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.stream().filter(
@@ -1521,6 +1529,7 @@ public abstract class RoutineLoadJob
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("msg", "Job need to be rescheduled")
.build());
+ this.otherMsg = pauseReason == null ? "" :
pauseReason.getMsg();
unprotectUpdateProgress();
unprotectUpdateState(JobState.NEED_SCHEDULE, null, false);
}
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_error_info.csv
b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv
new file mode 100644
index 00000000000..bc857cabcfd
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv
@@ -0,0 +1 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
18:39:10|2023-02-12|2023-01-27
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city":
"New York"}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
new file mode 100644
index 00000000000..f05b8af79ee
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_error_info","nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_error_info",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ // send data to kafka
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ // case 1: task failed
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // create table
+ def jobName = "test_error_info"
+ def tableName = "test_routine_error_info"
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql "sync"
+
+ // create job
+
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ // check error info
+ def count = 0
+ while (true) {
+ def res = sql "show routine load for ${jobName}"
+ log.info("show routine load: ${res[0].toString()}".toString())
+ log.info("other msg: ${res[0][19].toString()}".toString())
+ if (res[0][19].toString() != "") {
+ assertTrue(res[0][19].toString().contains("too many
segments in rowset"))
+ break;
+ }
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ continue;
+ }
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+
+ // case 2: reschedule job
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "test_error_info"
+ def tableName = "test_routine_error_info"
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql "sync"
+
+ // create job
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "invalid_job",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // check error info
+ def count = 0
+ while (true) {
+ def res = sql "show routine load for ${jobName}"
+ log.info("show routine load: ${res[0].toString()}".toString())
+ log.info("other msg: ${res[0][19].toString()}".toString())
+ if (res[0][19].toString() != "" && res[0][8].toString() ==
"NEED_SCHEDULE") {
+ assertTrue(res[0][19].toString().contains("may be Kafka
properties set in job is error or no partition in this topic that should check
Kafka"))
+ break;
+ }
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ break;
+ } else {
+ sleep(1000)
+ continue;
+ }
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]