This is an automated email from the ASF dual-hosted git repository.
xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 172f68480bb [Enhancement](load) Limit the number of incorrect data
drops and add documents (#27727)
172f68480bb is described below
commit 172f68480bb1e9b0c291160c662abd5c9c09f0d4
Author: lw112 <[email protected]>
AuthorDate: Fri Dec 22 10:43:18 2023 +0800
[Enhancement](load) Limit the number of incorrect data drops and add
documents (#27727)
In the load process, if there are problems with the original data, we will
store the error data in an error_log file on the disk for subsequent debugging.
However, if there are many error data, it will occupy a lot of disk space. Now
we want to limit the number of error data that is saved to the disk.
Be familiar with the usage of doris' import function and internal
implementation process
Add a new be configuration item load_error_log_limit_bytes = default value
200MB
Use the newly added threshold to limit the amount of data that
RuntimeState::append_error_msg_to_file writes to disk
Write regression cases for testing and verification
Co-authored-by: xy720 <[email protected]>
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/runtime/runtime_state.cpp | 12 +++-
docs/en/docs/admin-manual/config/be-config.md | 5 ++
docs/zh-CN/docs/admin-manual/config/be-config.md | 5 ++
.../stream_load/test_stream_load_err_log_limit.csv | 50 ++++++++++++++
.../stream_load/test_stream_load_err_log_limit.out | 3 +
.../test_stream_load_err_log_limit.groovy | 76 ++++++++++++++++++++++
8 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0ce0f63ba4e..1241ae39e67 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -475,6 +475,8 @@ DEFINE_Int32(single_replica_load_download_num_workers,
"64");
DEFINE_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
DEFINE_mInt64(load_error_log_reserve_hours, "48");
+// error log size limit, default 200MB
+DEFINE_mInt64(load_error_log_limit_bytes, "209715200");
DEFINE_Int32(brpc_heavy_work_pool_threads, "-1");
DEFINE_Int32(brpc_light_work_pool_threads, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 49cdaba0351..856e2482745 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -526,6 +526,8 @@ DECLARE_Int32(single_replica_load_download_num_workers);
DECLARE_Int64(load_data_reserve_hours);
// log error log will be removed after this time
DECLARE_mInt64(load_error_log_reserve_hours);
+// error log size limit, default 200MB
+DECLARE_mInt64(load_error_log_limit_bytes);
// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d2e6839be07..eec4cddb321 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -446,8 +446,16 @@ Status
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
}
}
- if (out.size() > 0) {
- (*_error_log_file) << fmt::to_string(out) << std::endl;
+ size_t error_row_size = out.size();
+ if (error_row_size > 0) {
+ if (error_row_size > config::load_error_log_limit_bytes) {
+ fmt::memory_buffer limit_byte_out;
+ limit_byte_out.append(out.data(), out.data() +
config::load_error_log_limit_bytes);
+ (*_error_log_file) << fmt::to_string(limit_byte_out) + "error log
is too long"
+ << std::endl;
+ } else {
+ (*_error_log_file) << fmt::to_string(out) << std::endl;
+ }
}
return Status::OK();
}
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 98595e45e92..1dcf7dbb241 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -728,6 +728,11 @@ BaseCompaction:546859:
* Description: The load error log will be deleted after this time
* Default value: 48 (h)
+#### `load_error_log_limit_bytes`
+
+* Description: The loading error logs larger than this value will be truncated
+* Default value: 209715200 (byte)
+
#### `load_process_max_memory_limit_percent`
* Description: The percentage of the upper memory limit occupied by all
imported threads on a single node, the default is 50%
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 727d5b3f333..802279c4cd0 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -754,6 +754,11 @@ BaseCompaction:546859:
* 描述: load错误日志将在此时间后删除
* 默认值: 48(h)
+#### `load_error_log_limit_bytes`
+
+* Description: load错误日志大小超过此值将被截断
+* 默认值: 209715200 (byte)
+
#### `load_process_max_memory_limit_percent`
* 描述: 单节点上所有的导入线程占据的内存上限比例
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.csv
b/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.csv
new file mode 100644
index 00000000000..446be6e8b31
--- /dev/null
+++
b/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.csv
@@ -0,0 +1,50 @@
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
+1,abc
\ No newline at end of file
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.out
b/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.out
new file mode 100644
index 00000000000..20a96212da7
--- /dev/null
+++
b/regression-test/data/load_p0/stream_load/test_stream_load_err_log_limit.out
@@ -0,0 +1,3 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+0
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_err_log_limit.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_err_log_limit.groovy
new file mode 100644
index 00000000000..261e0e1d49d
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_err_log_limit.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_err_log_limit", "p0") {
+ sql "show tables"
+
+ def tableName = "test_stream_load_err_log_limit_table"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int NOT NULL,
+ `k2` varchar(20) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ try {
+ set_be_param.call("load_error_log_limit_bytes", "100")
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3'
+ file 'test_stream_load_err_log_limit.csv'
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ def (code, out, err) = curl("GET", json.ErrorURL)
+ log.info("error result: " + out)
+ def checkError = out.contains("error log is too long")
+ assertTrue(checkError)
+ log.info("url: " + json.ErrorURL)
+ }
+ }
+ } finally {
+ set_be_param.call("load_error_log_limit_bytes", "209715200")
+ }
+
+ sql "sync"
+ qt_sql "select count(*) from ${tableName}"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]