This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0fdf8724d23 branch-3.0: [chore] Fix unhandled exceptions thrown by
stoi on streamload #49714 (#50410)
0fdf8724d23 is described below
commit 0fdf8724d23001b37a926b3509aadb728aea67a5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Apr 27 10:29:22 2025 +0800
branch-3.0: [chore] Fix unhandled exceptions thrown by stoi on streamload
#49714 (#50410)
Cherry-picked from #49714
Co-authored-by: 神技圈子 <[email protected]>
Co-authored-by: 宋光璠 <[email protected]>
Co-authored-by: morningman <[email protected]>
Co-authored-by: morningman <[email protected]>
---
be/src/http/action/adjust_log_level.cpp | 33 ++++++++----
be/src/http/action/stream_load.cpp | 29 +++++-----
be/src/util/string_util.cpp | 11 ++++
be/src/util/string_util.h | 2 +-
.../data/load_p0/stream_load/large_test_file.csv | 5 ++
.../load_p0/stream_load/test_stream_load.groovy | 6 +--
.../test_stream_load_illegal_skip_lines.groovy | 62 ++++++++++++++++++++++
.../test_stream_load_illegal_timeout.groovy | 61 +++++++++++++++++++++
8 files changed, 178 insertions(+), 31 deletions(-)
diff --git a/be/src/http/action/adjust_log_level.cpp
b/be/src/http/action/adjust_log_level.cpp
index a8644a0fb5f..2aca94571a6 100644
--- a/be/src/http/action/adjust_log_level.cpp
+++ b/be/src/http/action/adjust_log_level.cpp
@@ -22,12 +22,13 @@
#include "common/logging.h"
#include "http/http_channel.h"
#include "http/http_request.h"
+#include "util/string_util.h"
namespace doris {
// **Note**: If the module_name does not exist in the vlog modules, vlog
// would create corresponding module for it.
-std::tuple<std::string, int, int> handle_request(HttpRequest* req) {
+Result<std::tuple<std::string, int, int>> handle_request(HttpRequest* req) {
auto parse_param = [&req](std::string param) {
const auto& value = req->param(param);
if (value.empty()) {
@@ -38,22 +39,34 @@ std::tuple<std::string, int, int>
handle_request(HttpRequest* req) {
};
const auto& module = parse_param("module");
const auto& level = parse_param("level");
- int new_level = std::stoi(level);
- return std::make_tuple(module, google::SetVLOGLevel(module.c_str(),
new_level), new_level);
+ auto result = safe_stoi(level, "level");
+ if (result.has_value()) {
+ return std::make_tuple(module, google::SetVLOGLevel(module.c_str(),
result.value()),
+ result.value());
+ } else {
+ return unexpected(std::move(result).error());
+ }
}
void AdjustLogLevelAction::handle(HttpRequest* req) {
try {
auto handle_result = handle_request(req);
- auto msg =
- fmt::format("adjust vlog of {} from {} to {} succeed",
std::get<0>(handle_result),
- std::get<1>(handle_result),
std::get<2>(handle_result));
- LOG(INFO) << msg;
- HttpChannel::send_reply(req, msg);
+ if (handle_result.has_value()) {
+ auto msg = fmt::format(
+ "adjust vlog of {} from {} to {} succeed",
std::get<0>(handle_result.value()),
+ std::get<1>(handle_result.value()),
std::get<2>(handle_result.value()));
+ LOG(INFO) << msg;
+ HttpChannel::send_reply(req, msg);
+ } else {
+ LOG(WARNING) << "adjust log level failed, error: " <<
handle_result.error();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+
handle_result.error().to_string_no_stack());
+ return;
+ }
} catch (const std::exception& e) {
- HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
e.what());
LOG(WARNING) << "adjust log level failed, error: " << e.what();
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
e.what());
return;
}
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 55e20a764ba..e3abd5a8a5d 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -329,11 +329,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
}
if (!http_req->header(HTTP_TIMEOUT).empty()) {
- try {
- ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
- } catch (const std::invalid_argument& e) {
- return Status::InvalidArgument("Invalid timeout format, {}",
e.what());
- }
+ ctx->timeout_second =
DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT));
}
if (!http_req->header(HTTP_COMMENT).empty()) {
ctx->load_comment = http_req->header(HTTP_COMMENT);
@@ -565,15 +561,9 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
}
if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
- try {
- request.__set_send_batch_parallelism(
- std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM)));
- } catch (const std::invalid_argument& e) {
- return Status::InvalidArgument("send_batch_parallelism must be an
integer, {}",
- e.what());
- } catch (const std::out_of_range& e) {
- return Status::InvalidArgument("send_batch_parallelism out of
range, {}", e.what());
- }
+ int parallelism =
DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM),
+ HTTP_SEND_BATCH_PARALLELISM));
+ request.__set_send_batch_parallelism(parallelism);
}
if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
@@ -629,7 +619,11 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
}
}
if (!http_req->header(HTTP_SKIP_LINES).empty()) {
- request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES)));
+ int skip_lines =
DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES));
+ if (skip_lines < 0) {
+ return Status::InvalidArgument("Invalid 'skip_lines': {}",
skip_lines);
+ }
+ request.__set_skip_lines(skip_lines);
}
if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
@@ -650,8 +644,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_memtable_on_sink_node(value);
}
if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) {
- int value = std::stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE));
- request.__set_stream_per_node(value);
+ int stream_per_node = DORIS_TRY(
+ safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE),
HTTP_LOAD_STREAM_PER_NODE));
+ request.__set_stream_per_node(stream_per_node);
}
if (ctx->group_commit) {
if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp
index bbd30771678..0d23feb93e7 100644
--- a/be/src/util/string_util.cpp
+++ b/be/src/util/string_util.cpp
@@ -31,4 +31,15 @@ size_t hash_of_path(const std::string& identifier, const
std::string& path) {
return hash;
}
+Result<int> safe_stoi(const std::string& input, const std::string& name) {
+ try {
+ return std::stoi(input);
+ } catch (const std::invalid_argument& e) {
+ return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>(
+ std::string("Invalid format of '{}': '{}', {}"), name, input,
e.what()));
+ } catch (const std::out_of_range& e) {
+ return ResultError(Status::Error<ErrorCode::INVALID_ARGUMENT>(
+ std::string("'{}' value out of range: '{}', {}"), name, input,
e.what()));
+ }
+}
} // namespace doris
diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h
index a5837a538cc..f5edd08e64c 100644
--- a/be/src/util/string_util.h
+++ b/be/src/util/string_util.h
@@ -127,7 +127,7 @@ public:
};
size_t hash_of_path(const std::string& identifier, const std::string& path);
-
+Result<int> safe_stoi(const std::string& input, const std::string& name);
using StringCaseSet = std::set<std::string, StringCaseLess>;
using StringCaseUnorderedSet = std::unordered_set<std::string,
StringCaseHasher, StringCaseEqual>;
template <class T>
diff --git a/regression-test/data/load_p0/stream_load/large_test_file.csv
b/regression-test/data/load_p0/stream_load/large_test_file.csv
new file mode 100644
index 00000000000..2f33dfed7fa
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/large_test_file.csv
@@ -0,0 +1,5 @@
+1 10 5 testA abc
+2 20 7 testB def
+3 30 9 testC ghi
+4 40 2 testD jkl
+5 50 4 testE mno
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index a9cd807fe44..ca3cc83e4d0 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1457,7 +1457,7 @@ suite("test_stream_load", "p0") {
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
- assertEquals("[INVALID_ARGUMENT]send_batch_parallelism must be an
integer, stoi", json.Message)
+ assertEquals("[INVALID_ARGUMENT]Invalid format of
'send_batch_parallelism': 'a', stoi", json.Message)
}
}
@@ -1474,7 +1474,7 @@ suite("test_stream_load", "p0") {
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
- assertEquals("[INVALID_ARGUMENT]send_batch_parallelism out of
range, stoi", json.Message)
+ assertEquals("[INVALID_ARGUMENT]'send_batch_parallelism' value out
of range: '21474836471', stoi", json.Message)
}
}
@@ -1626,7 +1626,7 @@ suite("test_stream_load", "p0") {
log.info(sql_result[0][0].toString())
log.info(sql_result[0][1].toString())
- log.info(sql_result[0].size.toString())
+ log.info(sql_result.toString())
def beHost=sql_result[0][0]
def beHttpPort=sql_result[0][1]
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
new file mode 100644
index 00000000000..dcd97e783d6
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_skip_lines.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_illegal_skip_lines", "p0") {
+ def tableName = "test_stream_load_illegal_skip_lines"
+
+ def be_num = sql "show backends;"
+ if (be_num.size() > 1) {
+ // not suitable for multiple be cluster.
+ return
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'strict_mode','true'
+
+ file 'large_test_file.csv'
+ set 'skip_lines', '-3'
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals("[INVALID_ARGUMENT]Invalid 'skip_lines': -3",
json.Message)
+ }
+ }
+}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
new file mode 100644
index 00000000000..e49b0cc8391
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_illegal_timeout.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_illegal_timeout", "p0") {
+ def tableName = "test_stream_load_illegal_timeout";
+
+ def be_num = sql "show backends;"
+ if (be_num.size() > 1) {
+ // not suitable for multiple be cluster.
+ return
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'strict_mode','true'
+
+ file 'large_test_file.csv'
+ set 'timeout', 'abc'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals("[INVALID_ARGUMENT]Invalid format of 'timeout':
'abc', stoi", json.Message)
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]