This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 845e74bbe87 branch-3.0: [improve](routine load) improve routine load 
observability #46238 (#46567)
845e74bbe87 is described below

commit 845e74bbe87aff067277a8e9e2a418c8884c9b80
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 13 21:52:28 2025 +0800

    branch-3.0: [improve](routine load) improve routine load observability 
#46238 (#46567)
    
    Cherry-picked from #46238
    
    Co-authored-by: hui lai <[email protected]>
---
 .../doris/load/routineload/RoutineLoadJob.java     |  15 +-
 .../load_p0/routine_load/data/test_error_info.csv  |   1 +
 .../test_routine_load_error_info.groovy            | 290 +++++++++++++++++++++
 3 files changed, 303 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index eb4f6b99143..8de1e555140 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -503,7 +503,12 @@ public abstract class RoutineLoadJob
     }
 
     public void setOtherMsg(String otherMsg) {
-        this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + 
Strings.nullToEmpty(otherMsg);
+        writeLock();
+        try {
+            this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + 
Strings.nullToEmpty(otherMsg);
+        } finally {
+            writeUnlock();
+        }
     }
 
     public String getDbFullName() throws MetaNotFoundException {
@@ -920,9 +925,10 @@ public abstract class RoutineLoadJob
                                 + "when current total rows is more than base 
or the filter ratio is more than the max")
                         .build());
             }
-            // reset currentTotalNum and currentErrorNum
+            // reset currentTotalNum, currentErrorNum and otherMsg
             this.jobStatistic.currentErrorRows = 0;
             this.jobStatistic.currentTotalRows = 0;
+            this.otherMsg = "";
         } else if (this.jobStatistic.currentErrorRows > maxErrorNum
                 || (this.jobStatistic.currentTotalRows > 0
                     && ((double) this.jobStatistic.currentErrorRows
@@ -941,9 +947,10 @@ public abstract class RoutineLoadJob
                         "current error rows is more than max_error_number "
                             + "or the max_filter_ratio is more than the value 
set"), isReplay);
             }
-            // reset currentTotalNum and currentErrorNum
+            // reset currentTotalNum, currentErrorNum and otherMsg
             this.jobStatistic.currentErrorRows = 0;
             this.jobStatistic.currentTotalRows = 0;
+            this.otherMsg = "";
         }
     }
 
@@ -1202,6 +1209,7 @@ public abstract class RoutineLoadJob
         long taskBeId = -1L;
         try {
             
this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId());
+            setOtherMsg(txnStatusChangeReasonString);
             if (txnOperated) {
                 // step0: find task in job
                 Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional = 
routineLoadTaskInfoList.stream().filter(
@@ -1521,6 +1529,7 @@ public abstract class RoutineLoadJob
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
                         .add("msg", "Job need to be rescheduled")
                         .build());
+                this.otherMsg = pauseReason == null ? "" : 
pauseReason.getMsg();
                 unprotectUpdateProgress();
                 unprotectUpdateState(JobState.NEED_SCHEDULE, null, false);
             }
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_error_info.csv 
b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv
new file mode 100644
index 00000000000..bc857cabcfd
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv
@@ -0,0 +1 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
 18:39:10|2023-02-12|2023-01-27 
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": 
"New York"}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
new file mode 100644
index 00000000000..f05b8af79ee
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_error_info","nonConcurrent") {
+    def kafkaCsvTpoics = [
+                  "test_error_info",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    // send data to kafka 
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        def producer = new KafkaProducer<>(props)
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    // case 1: task failed
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // create table
+        def jobName = "test_error_info"
+        def tableName = "test_routine_error_info"
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql "sync"
+
+            // create job
+            
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            // check error info
+            def count = 0
+            while (true) {
+                def res = sql "show routine load for ${jobName}"
+                log.info("show routine load: ${res[0].toString()}".toString())
+                log.info("other msg: ${res[0][19].toString()}".toString())
+                if (res[0][19].toString() != "") {
+                    assertTrue(res[0][19].toString().contains("too many 
segments in rowset"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                } else {
+                    sleep(1000)
+                    continue;
+                }
+            }
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+
+    // case 2: reschedule job
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "test_error_info"
+        def tableName = "test_routine_error_info"
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql "sync"
+
+            // create job
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "invalid_job",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // check error info
+            def count = 0
+            while (true) {
+                def res = sql "show routine load for ${jobName}"
+                log.info("show routine load: ${res[0].toString()}".toString())
+                log.info("other msg: ${res[0][19].toString()}".toString())
+                if (res[0][19].toString() != "" && res[0][8].toString() == 
"NEED_SCHEDULE") {
+                    assertTrue(res[0][19].toString().contains("may be Kafka 
properties set in job is error or no partition in this topic that should check 
Kafka"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                } else {
+                    sleep(1000)
+                    continue;
+                }
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to