This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f3f17eb222 [Bugfix](load) fix be will coredump when parsing malformed
json file using simdjson (#12062)
f3f17eb222 is described below
commit f3f17eb2226bd7e26ea0c4035de000c6725c45e9
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Aug 26 18:01:19 2022 +0800
[Bugfix](load) fix be will coredump when parsing malformed json file using
simdjson (#12062)
* [Bugfix](load) fix be will coredump when parsing malformed json file
using simdjson
---
be/src/vec/exec/vjson_scanner.cpp | 214 ++++++++++++---------
.../load_p0/stream_load/invalid_json_array.json | 11 ++
.../data/load_p0/stream_load/test_json_load.out | 1 +
.../load_p0/stream_load/test_json_load.groovy | 42 +++-
thirdparty/build-thirdparty.sh | 9 +-
5 files changed, 179 insertions(+), 98 deletions(-)
diff --git a/be/src/vec/exec/vjson_scanner.cpp
b/be/src/vec/exec/vjson_scanner.cpp
index 5c531e6ab5..dccbdd5241 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -558,65 +558,6 @@ Status VSIMDJsonReader::read_json_column(Block& block,
return (this->*_vhandle_json_callback)(block, slot_descs, is_empty_row,
eof);
}
-Status VSIMDJsonReader::_vhandle_simple_json(Block& block,
- const
std::vector<SlotDescriptor*>& slot_descs,
- bool* is_empty_row, bool* eof) {
- simdjson::ondemand::value objectValue;
- simdjson::ondemand::array array;
- do {
- bool valid = false;
- if (_next_line >= _total_lines) { // parse json and generic document
- Status st = _parse_json(is_empty_row, eof);
- if (st.is_data_quality_error()) {
- continue; // continue to read next
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row == true) {
- return Status::OK();
- }
- if (_json_value.type() == simdjson::ondemand::json_type::array) {
- array = _json_value.get_array();
- _array_iter = array.begin();
- _total_lines = array.count_elements();
- if (_total_lines == 0) {
- // may be passing an empty json, such as "[]"
- RETURN_IF_ERROR(_append_error_msg("Empty json line", "",
nullptr));
- if (*_scanner_eof) {
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
- }
- } else {
- _total_lines = 1; // only one row
- objectValue = _json_value;
- }
- _next_line = 0;
- }
-
- if (_json_value.type() == simdjson::ondemand::json_type::array) { //
handle case 1
- objectValue = *_array_iter;
- RETURN_IF_ERROR(_set_column_value(objectValue, block, slot_descs,
&valid));
- ++_array_iter;
- } else { // handle case 2
- RETURN_IF_ERROR(_set_column_value(_json_value, block, slot_descs,
&valid));
- }
- _next_line++;
- if (!valid) {
- if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means that
we have encountered
- // unqualified data and decided to stop the scan.
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
- }
- *is_empty_row = false;
- break; // get a valid row, then break
- } while (_next_line <= _total_lines);
- return Status::OK();
-}
-
#define RETURN_IF_SIMDJSON_ERROR(error, col_name, valid)
\
if (UNLIKELY(error)) {
\
RETURN_IF_ERROR(_append_error_msg("Encounter error while iterate
json", col_name, valid)); \
@@ -811,7 +752,6 @@ Status VSIMDJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
- // _json_value = real_doc.value();
RETURN_IF_SIMDJSON_ERROR(real_doc.get(_json_value), "", nullptr);
} else {
RETURN_IF_SIMDJSON_ERROR(_original_json_doc.get(_json_value), "",
nullptr);
@@ -852,7 +792,6 @@ Status VSIMDJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
-
return Status::OK();
}
@@ -881,37 +820,122 @@ Status VSIMDJsonReader::_append_error_msg(std::string
error_msg, std::string col
return Status::OK();
}
-Status VSIMDJsonReader::_vhandle_flat_array_complex_json(
- Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row,
- bool* eof) {
+Status VSIMDJsonReader::_vhandle_simple_json(Block& block,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ simdjson::ondemand::value objectValue;
+ simdjson::ondemand::array array;
do {
- if (_next_line >= _total_lines) {
- Status st = _parse_json(is_empty_row, eof);
- if (st.is_data_quality_error()) {
- continue; // continue to read next
+ bool valid = false;
+ try {
+ if (_next_line >= _total_lines) { // parse json and generic
document
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ array = _json_value.get_array();
+ _array_iter = array.begin();
+
+ _total_lines = array.count_elements();
+ if (_total_lines == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg("Empty json line",
"", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_lines = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_line = 0;
}
- RETURN_IF_ERROR(st);
- if (*is_empty_row == true) {
- if (st == Status::OK()) {
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(_set_column_value(objectValue, block,
slot_descs, &valid));
+ ++_array_iter;
+ } else { // handle case 2
+ RETURN_IF_ERROR(_set_column_value(_json_value, block,
slot_descs, &valid));
+ }
+ _next_line++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
return Status::OK();
}
- if (_total_lines == 0) {
- continue;
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ RETURN_IF_ERROR(_append_error_msg("Empty json line", "", nullptr));
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
}
+ continue;
}
- simdjson::ondemand::array array;
- RETURN_IF_SIMDJSON_ERROR(_json_value.get(array), "", nullptr);
- _array_iter = array.begin();
+ continue;
}
- bool valid = true;
- RETURN_IF_ERROR(_write_columns_by_jsonpath(*_array_iter, slot_descs,
block, &valid));
- ++_array_iter;
- ++_next_line;
- if (!valid) {
- continue; // process next line
+ } while (_next_line <= _total_lines);
+ return Status::OK();
+}
+
+Status VSIMDJsonReader::_vhandle_flat_array_complex_json(
+ Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row,
+ bool* eof) {
+ do {
+ try {
+ if (_next_line >= _total_lines) {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_lines == 0) {
+ continue;
+ }
+ }
+ simdjson::ondemand::array array;
+ RETURN_IF_SIMDJSON_ERROR(_json_value.get(array), "", nullptr);
+ _array_iter = array.begin();
+ }
+ bool valid = true;
+ RETURN_IF_ERROR(_write_columns_by_jsonpath(*_array_iter,
slot_descs, block, &valid));
+ ++_array_iter;
+ ++_next_line;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr);
}
- *is_empty_row = false;
- break; // get a valid row, then break
} while (_next_line <= _total_lines);
return Status::OK();
}
@@ -920,16 +944,20 @@ Status
VSIMDJsonReader::_vhandle_nested_complex_json(Block& block,
const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool*
eof) {
while (true) {
- Status st = _parse_json(is_empty_row, eof);
- if (st.is_data_quality_error()) {
- continue; // continue to read next
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row == true) {
- return Status::OK();
+ try {
+ Status st = _parse_json(is_empty_row, eof);
+ if (st.is_data_quality_error()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr);
}
- *is_empty_row = false;
- break; // read a valid row
}
bool valid = true;
RETURN_IF_ERROR(_write_columns_by_jsonpath(_json_value, slot_descs, block,
&valid));
diff --git a/regression-test/data/load_p0/stream_load/invalid_json_array.json
b/regression-test/data/load_p0/stream_load/invalid_json_array.json
new file mode 100644
index 0000000000..7f1e5f1884
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/invalid_json_array.json
@@ -0,0 +1,11 @@
+ [ {"v6": "7395.231067", "k4": "1000", "k3": "100", "k2": "10", "k1": "1",
"v3": "ynqnzeowymt", "v2": "t", "v1": "2010-01-01", "k5": "2011-01-01
00:00:00", "v5": "180.998031", "v4": "38.638843"},
+ {"v6": "2080.504502", "k4": "2000", "k3": "200", "k2": "20", "k1": "2",
"v3": "hfkfwlr", "v2": "f", "v1": "2010-01-02", "k5": "2012-01-01 00:00:00",
"v5": "539.922834", "v4": "506.044046"},
+ {"v6": "4605.253205", "k4": "3000", "k3": "300", "k2": "30", "k1": "3",
"v3": "uoclasp", "v2": "t", "v1": "2010-01-03", "k5": "2013-01-01 00:00:00",
"v5": "577.044148", "v4": "377.793209"},
+ {"v6": "7291.703724", "k4": "4000", "k3": "400", "k2": "40", "k1": "4",
"v3": "iswngzeodfhptjzgswsddt", "v2": "n", "v1": "2010-01-04", "k5":
"2014-01-01 00:00:00", "v5": "919.067864", "v4": "871.354536"},
+ {"v6": "3903.906901", "k4": "5000", "k3": "500", "k2": "50", "k1": "5",
"v3": "sqodagzlyrmcelyxgcgcsfuxadcdt", "v2": "a", "v1": "2010-01-05", "k5":
"2015-01-01 00:00:00", "v5": "929.660783", "v4": "462.067902"},
+ {"v6": "6514.405051", "k4": "6000", "k3": "600", "k2": "60", "k1": "6",
"v3": "obdrei", "v2": "m", "v1": "2010-01-06", "k5": "2016-01-01 00:00:00",
"v5": "882.708491", "v4": "921.867848"},
+ {"v6": "8604.198677", "k4": "7000", "k3": "700", "k2": "70", "k1": "7",
"v3": "cuobdhvrgkugknj", "v2": "a", "v1": "2010-01-07", "k5": "2017-01-01
00:00:00", "v5": "209.420112", "v4": "141.656421"},
+ {"v6": "7784.859446", "k4": "8000", "k3": "800", "k2": "80", "k1": "8",
"v3": "phcxztwgjllhmj", "v2": "z", "v1": "2010-01-08", "k5": "2018-01-01
00:00:00", "v5": "285.664871", "v4": "762.813376"},
+ {"v6": "4846.735593", "k4": "9000", "k3": "900", "k2": "90", "k1": "9",
"v3": "nbarqjwilbkelk", "v2": "b", "v1": "2010-01-09", "k5": "2019-01-01
00:00:00", "v5": "535.285510", "v4": "92.702403"},
+ {"v6": "7996.434686", "k4": "10000", "k3": "1000", "k2": "100", "k1":
"10", "v3": "zucprgdnlgzzfl", "v2": "s", "v1": "2010-01-10", "k5": "2020-01-01
00:00:00", "v5": "155.861217", "v4": "26.874738"},]
+
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 79eed67a38..b6df264df1 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -118,3 +118,4 @@
4 shenzhen 2345674
200 changsha 3456789
+-- !select --
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index e0b190f2f8..1bca4f9687 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -63,6 +63,32 @@ suite("test_json_load", "p0") {
assertTrue(result2[0].size() == 1)
assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
}
+
+ def test_invalid_json_array_table = { testTablex ->
+ // multi-line sql
+ def result1 = sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ k1 TINYINT NOT NULL,
+ k2 SMALLINT NOT NULL,
+ k3 INT NOT NULL,
+ k4 BIGINT NOT NULL,
+ k5 DATETIME NOT NULL,
+ v1 DATE REPLACE NOT NULL,
+ v2 CHAR REPLACE NOT NULL,
+ v3 VARCHAR(4096) REPLACE NOT NULL,
+ v4 FLOAT SUM NOT NULL,
+ v5 DOUBLE SUM NOT NULL,
+ v6 DECIMAL(20,7) SUM NOT NULL
+ ) AGGREGATE KEY(k1,k2,k3,k4,k5)
+ DISTRIBUTED BY HASH(k1) BUCKETS 15
+ PROPERTIES("replication_num" = "1");
+ """
+
+ // DDL/DML return 1 row and 3 column, the only value is update row
count
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+ }
def load_json_data = {strip_flag, read_flag, format_flag, exprs,
json_paths,
json_root, where_expr, fuzzy_flag, file_name,
ignore_failure=false ->
@@ -324,7 +350,7 @@ suite("test_json_load", "p0") {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
-
+
def test_load_label = UUID.randomUUID().toString().replaceAll("-",
"")
load_from_hdfs2.call(testTable, test_load_label, hdfs_file_path,
format,
brokerName, hdfsUser, hdfsPasswd)
@@ -333,5 +359,19 @@ suite("test_json_load", "p0") {
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
+
+ // case13: invalid json
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+
+ test_invalid_json_array_table.call(testTable)
+ load_json_data.call('true', '', 'json', '', '',
+ '', '', '', 'invalid_json_array.json', true)
+
+ qt_select "select * from ${testTable}"
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
}
}
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index c30f6cfeb7..71092f21f5 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -1287,10 +1287,11 @@ build_simdjson() {
mkdir -p "${BUILD_DIR}"
cd "${BUILD_DIR}"
- CXXFLAGS="-O3" \
- CFLAGS="-O3" \
- "${CMAKE_CMD}" ..
- "${CMAKE_CMD}" --build .
+ CXXFLAGS="-O3" CFLAGS="-O3" \
+ "${CMAKE_CMD}" -DSIMDJSON_EXCEPTIONS=OFF \
+ -DSIMDJSON_DEVELOPER_MODE=OFF -DSIMDJSON_BUILD_STATIC=ON \
+ -DSIMDJSON_JUST_LIBRARY=ON -DSIMDJSON_ENABLE_THREADS=ON ..
+ "${CMAKE_CMD}" --build . --config Release
cp "${TP_SOURCE_DIR}/${SIMDJSON_SOURCE}/${BUILD_DIR}/libsimdjson.a"
"${TP_INSTALL_DIR}/lib64"
cp -r "${TP_SOURCE_DIR}/${SIMDJSON_SOURCE}/include"/* "${TP_INCLUDE_DIR}/"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]