This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e58fb2e2e4d branch-3.0: [fix](load) fix multi table load repeated 
failures and retries when meet data quality error #49938 (#50026)
e58fb2e2e4d is described below

commit e58fb2e2e4d4678a738afc2e054a3f6e56b86921
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 15 17:47:34 2025 +0800

    branch-3.0: [fix](load) fix multi table load repeated failures and retries 
when meet data quality error #49938 (#50026)
    
    Cherry-picked from #49938
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/io/fs/multi_table_pipe.cpp                  |  25 +++--
 be/src/io/fs/multi_table_pipe.h                    |   1 +
 .../apache/doris/planner/StreamLoadPlanner.java    |   7 +-
 .../test_multi_table_load_data_quality_error.out   | Bin 0 -> 111 bytes
 .../data/multi_table_load_data_quality.csv         |   2 +
 ...test_multi_table_load_data_quality_error.groovy | 120 +++++++++++++++++++++
 6 files changed, 141 insertions(+), 14 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 463f002596a..5e27ef35ab6 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -193,6 +193,7 @@ Status MultiTablePipe::request_and_exec_plans() {
         request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
         request.__set_user(_ctx->qualified_user);
         request.__set_cloud_cluster(_ctx->cloud_cluster);
+        request.__set_max_filter_ratio(1.0);
         // no need to register new_load_stream_mgr coz it is already done in 
routineload submit task
 
         // plan this load
@@ -271,21 +272,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
                     _number_loaded_rows += state->num_rows_load_success();
                     _number_filtered_rows += state->num_rows_load_filtered();
                     _number_unselected_rows += 
state->num_rows_load_unselected();
-                    _ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
-                    // check filtered ratio for this plan fragment
-                    int64_t num_selected_rows =
-                            state->num_rows_load_total() - 
state->num_rows_load_unselected();
-                    if (num_selected_rows > 0 &&
-                        (double)state->num_rows_load_filtered() / 
num_selected_rows >
-                                _ctx->max_filter_ratio) {
-                        *status = Status::DataQualityError("too many filtered 
rows");
-                    }
 
                     // if any of the plan fragment exec failed, set the status 
to the first failed plan
-                    if (!status->ok()) {
-                        LOG(WARNING)
-                                << "plan fragment exec failed. errmsg=" << 
*status << _ctx->brief();
-                        _status = *status;
+                    {
+                        std::lock_guard<std::mutex> l(_callback_lock);
+                        if (!state->get_error_log_file_path().empty()) {
+                            _ctx->error_url =
+                                    
to_load_error_http_path(state->get_error_log_file_path());
+                        }
+                        if (!status->ok()) {
+                            LOG(WARNING) << "plan fragment exec failed. 
errmsg=" << *status
+                                         << _ctx->brief();
+                            _status = *status;
+                        }
                     }
 
                     auto inflight_cnt = _inflight_cnt.fetch_sub(1);
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index f1d2e523652..fc1da272f12 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -95,6 +95,7 @@ private:
     std::atomic<int64_t> _number_unselected_rows {0};
 
     std::mutex _pipe_map_lock;
+    std::mutex _callback_lock;
     std::unordered_map<TUniqueId /*instance id*/, 
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
 
     uint32_t _row_threshold = config::multi_table_batch_plan_threshold;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 6e830f2fd64..5f6c448b930 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -45,6 +45,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadTaskInfo;
@@ -365,7 +366,11 @@ public class StreamLoadPlanner {
         
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
         queryGlobals.setTimestampMs(System.currentTimeMillis());
         queryGlobals.setTimeZone(taskInfo.getTimezone());
-        queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
+        if (taskInfo instanceof RoutineLoadJob) {
+            queryGlobals.setLoadZeroTolerance(false);
+        } else {
+            queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 
0.0);
+        }
         queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
 
         params.setQueryGlobals(queryGlobals);
diff --git 
a/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
 
b/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
new file mode 100644
index 00000000000..343c45fa998
Binary files /dev/null and 
b/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
 differ
diff --git 
a/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
 
b/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
new file mode 100644
index 00000000000..b74967971d6
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
@@ -0,0 +1,2 @@
+test_multi_table_load_data_quality|1,a
+test_multi_table_load_data_quality_error|a,a
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
new file mode 100644
index 00000000000..549dbdf3f59
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_multi_table_load_data_quality_error","p0") {
+    def kafkaCsvTpoics = [
+                  "multi_table_load_data_quality",
+                ]
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def tableName = "test_multi_table_load_data_quality"
+        def tableName1 = "test_multi_table_load_data_quality_error"
+        def jobName = "test_multi_table_load_data_quality_error"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """ DROP TABLE IF EXISTS ${tableName1} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName1} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES
+                (
+                    "strict_mode" = "true"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: 
${state[0][8].toString()}".toString())
+                log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                log.info("error url: ${state[0][18].toString()}".toString())
+                if (res[0][0] > 0 && state[0][18].toString() != "") {
+                    break
+                }
+                if (count >= 120) {
+                    log.error("routine load can not visible for long time")
+                    assertEquals(20, res[0][0])
+                    break
+                }
+                sleep(1000)
+                count++
+            }
+            qt_sql "select * from ${tableName} order by k1"
+
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to