This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f17eb222 [Bugfix](load) fix be will coredump when parsing malformed 
json file using simdjson (#12062)
f3f17eb222 is described below

commit f3f17eb2226bd7e26ea0c4035de000c6725c45e9
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Aug 26 18:01:19 2022 +0800

    [Bugfix](load) fix be will coredump when parsing malformed json file using 
simdjson (#12062)
    
    * [Bugfix](load) fix be will coredump when parsing malformed json file 
using simdjson
---
 be/src/vec/exec/vjson_scanner.cpp                  | 214 ++++++++++++---------
 .../load_p0/stream_load/invalid_json_array.json    |  11 ++
 .../data/load_p0/stream_load/test_json_load.out    |   1 +
 .../load_p0/stream_load/test_json_load.groovy      |  42 +++-
 thirdparty/build-thirdparty.sh                     |   9 +-
 5 files changed, 179 insertions(+), 98 deletions(-)

diff --git a/be/src/vec/exec/vjson_scanner.cpp 
b/be/src/vec/exec/vjson_scanner.cpp
index 5c531e6ab5..dccbdd5241 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -558,65 +558,6 @@ Status VSIMDJsonReader::read_json_column(Block& block,
     return (this->*_vhandle_json_callback)(block, slot_descs, is_empty_row, 
eof);
 }
 
-Status VSIMDJsonReader::_vhandle_simple_json(Block& block,
-                                             const 
std::vector<SlotDescriptor*>& slot_descs,
-                                             bool* is_empty_row, bool* eof) {
-    simdjson::ondemand::value objectValue;
-    simdjson::ondemand::array array;
-    do {
-        bool valid = false;
-        if (_next_line >= _total_lines) { // parse json and generic document
-            Status st = _parse_json(is_empty_row, eof);
-            if (st.is_data_quality_error()) {
-                continue; // continue to read next
-            }
-            RETURN_IF_ERROR(st);
-            if (*is_empty_row == true) {
-                return Status::OK();
-            }
-            if (_json_value.type() == simdjson::ondemand::json_type::array) {
-                array = _json_value.get_array();
-                _array_iter = array.begin();
-                _total_lines = array.count_elements();
-                if (_total_lines == 0) {
-                    // may be passing an empty json, such as "[]"
-                    RETURN_IF_ERROR(_append_error_msg("Empty json line", "", 
nullptr));
-                    if (*_scanner_eof) {
-                        *is_empty_row = true;
-                        return Status::OK();
-                    }
-                    continue;
-                }
-            } else {
-                _total_lines = 1; // only one row
-                objectValue = _json_value;
-            }
-            _next_line = 0;
-        }
-
-        if (_json_value.type() == simdjson::ondemand::json_type::array) { // 
handle case 1
-            objectValue = *_array_iter;
-            RETURN_IF_ERROR(_set_column_value(objectValue, block, slot_descs, 
&valid));
-            ++_array_iter;
-        } else { // handle case 2
-            RETURN_IF_ERROR(_set_column_value(_json_value, block, slot_descs, 
&valid));
-        }
-        _next_line++;
-        if (!valid) {
-            if (*_scanner_eof) {
-                // When _scanner_eof is true and valid is false, it means that 
we have encountered
-                // unqualified data and decided to stop the scan.
-                *is_empty_row = true;
-                return Status::OK();
-            }
-            continue;
-        }
-        *is_empty_row = false;
-        break; // get a valid row, then break
-    } while (_next_line <= _total_lines);
-    return Status::OK();
-}
-
 #define RETURN_IF_SIMDJSON_ERROR(error, col_name, valid)                       
                    \
     if (UNLIKELY(error)) {                                                     
                    \
         RETURN_IF_ERROR(_append_error_msg("Encounter error while iterate 
json", col_name, valid)); \
@@ -811,7 +752,6 @@ Status VSIMDJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
             }
             return Status::DataQualityError(fmt::to_string(error_msg));
         }
-        // _json_value = real_doc.value();
         RETURN_IF_SIMDJSON_ERROR(real_doc.get(_json_value), "", nullptr);
     } else {
         RETURN_IF_SIMDJSON_ERROR(_original_json_doc.get(_json_value), "", 
nullptr);
@@ -852,7 +792,6 @@ Status VSIMDJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         }
         return Status::DataQualityError(fmt::to_string(error_msg));
     }
-
     return Status::OK();
 }
 
@@ -881,37 +820,122 @@ Status VSIMDJsonReader::_append_error_msg(std::string 
error_msg, std::string col
     return Status::OK();
 }
 
-Status VSIMDJsonReader::_vhandle_flat_array_complex_json(
-        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row,
-        bool* eof) {
+Status VSIMDJsonReader::_vhandle_simple_json(Block& block,
+                                             const 
std::vector<SlotDescriptor*>& slot_descs,
+                                             bool* is_empty_row, bool* eof) {
+    simdjson::ondemand::value objectValue;
+    simdjson::ondemand::array array;
     do {
-        if (_next_line >= _total_lines) {
-            Status st = _parse_json(is_empty_row, eof);
-            if (st.is_data_quality_error()) {
-                continue; // continue to read next
+        bool valid = false;
+        try {
+            if (_next_line >= _total_lines) { // parse json and generic 
document
+                Status st = _parse_json(is_empty_row, eof);
+                if (st.is_data_quality_error()) {
+                    continue; // continue to read next
+                }
+                RETURN_IF_ERROR(st);
+                if (*is_empty_row == true) {
+                    return Status::OK();
+                }
+                if (_json_value.type() == 
simdjson::ondemand::json_type::array) {
+                    array = _json_value.get_array();
+                    _array_iter = array.begin();
+
+                    _total_lines = array.count_elements();
+                    if (_total_lines == 0) {
+                        // may be passing an empty json, such as "[]"
+                        RETURN_IF_ERROR(_append_error_msg("Empty json line", 
"", nullptr));
+                        if (*_scanner_eof) {
+                            *is_empty_row = true;
+                            return Status::OK();
+                        }
+                        continue;
+                    }
+                } else {
+                    _total_lines = 1; // only one row
+                    objectValue = _json_value;
+                }
+                _next_line = 0;
             }
-            RETURN_IF_ERROR(st);
-            if (*is_empty_row == true) {
-                if (st == Status::OK()) {
+
+            if (_json_value.type() == simdjson::ondemand::json_type::array) { 
// handle case 1
+                objectValue = *_array_iter;
+                RETURN_IF_ERROR(_set_column_value(objectValue, block, 
slot_descs, &valid));
+                ++_array_iter;
+            } else { // handle case 2
+                RETURN_IF_ERROR(_set_column_value(_json_value, block, 
slot_descs, &valid));
+            }
+            _next_line++;
+            if (!valid) {
+                if (*_scanner_eof) {
+                    // When _scanner_eof is true and valid is false, it means 
that we have encountered
+                    // unqualified data and decided to stop the scan.
+                    *is_empty_row = true;
                     return Status::OK();
                 }
-                if (_total_lines == 0) {
-                    continue;
+                continue;
+            }
+            *is_empty_row = false;
+            break; // get a valid row, then break
+        } catch (simdjson::simdjson_error& e) {
+            fmt::memory_buffer error_msg;
+            fmt::format_to(error_msg, "Parse json data for array failed. code: 
{}, error info: {}",
+                           e.error(), e.what());
+            RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                    [&]() -> std::string { return ""; },
+                    [&]() -> std::string { return fmt::to_string(error_msg); 
}, eof));
+            _counter->num_rows_filtered++;
+            RETURN_IF_ERROR(_append_error_msg("Empty json line", "", nullptr));
+            if (!valid) {
+                if (*_scanner_eof) {
+                    // When _scanner_eof is true and valid is false, it means 
that we have encountered
+                    // unqualified data and decided to stop the scan.
+                    *is_empty_row = true;
+                    return Status::OK();
                 }
+                continue;
             }
-            simdjson::ondemand::array array;
-            RETURN_IF_SIMDJSON_ERROR(_json_value.get(array), "", nullptr);
-            _array_iter = array.begin();
+            continue;
         }
-        bool valid = true;
-        RETURN_IF_ERROR(_write_columns_by_jsonpath(*_array_iter, slot_descs, 
block, &valid));
-        ++_array_iter;
-        ++_next_line;
-        if (!valid) {
-            continue; // process next line
+    } while (_next_line <= _total_lines);
+    return Status::OK();
+}
+
+Status VSIMDJsonReader::_vhandle_flat_array_complex_json(
+        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* 
is_empty_row,
+        bool* eof) {
+    do {
+        try {
+            if (_next_line >= _total_lines) {
+                Status st = _parse_json(is_empty_row, eof);
+                if (st.is_data_quality_error()) {
+                    continue; // continue to read next
+                }
+                RETURN_IF_ERROR(st);
+                if (*is_empty_row == true) {
+                    if (st == Status::OK()) {
+                        return Status::OK();
+                    }
+                    if (_total_lines == 0) {
+                        continue;
+                    }
+                }
+                simdjson::ondemand::array array;
+                RETURN_IF_SIMDJSON_ERROR(_json_value.get(array), "", nullptr);
+                _array_iter = array.begin();
+            }
+            bool valid = true;
+            RETURN_IF_ERROR(_write_columns_by_jsonpath(*_array_iter, 
slot_descs, block, &valid));
+            ++_array_iter;
+            ++_next_line;
+            if (!valid) {
+                continue; // process next line
+            }
+            *is_empty_row = false;
+            break; // get a valid row, then break
+        } catch (simdjson::simdjson_error& e) {
+            RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr);
         }
-        *is_empty_row = false;
-        break; // get a valid row, then break
     } while (_next_line <= _total_lines);
     return Status::OK();
 }
@@ -920,16 +944,20 @@ Status 
VSIMDJsonReader::_vhandle_nested_complex_json(Block& block,
                                                      const 
std::vector<SlotDescriptor*>& slot_descs,
                                                      bool* is_empty_row, bool* 
eof) {
     while (true) {
-        Status st = _parse_json(is_empty_row, eof);
-        if (st.is_data_quality_error()) {
-            continue; // continue to read next
-        }
-        RETURN_IF_ERROR(st);
-        if (*is_empty_row == true) {
-            return Status::OK();
+        try {
+            Status st = _parse_json(is_empty_row, eof);
+            if (st.is_data_quality_error()) {
+                continue; // continue to read next
+            }
+            RETURN_IF_ERROR(st);
+            if (*is_empty_row == true) {
+                return Status::OK();
+            }
+            *is_empty_row = false;
+            break; // read a valid row
+        } catch (simdjson::simdjson_error& e) {
+            RETURN_IF_SIMDJSON_ERROR(e.error(), "", nullptr);
         }
-        *is_empty_row = false;
-        break; // read a valid row
     }
     bool valid = true;
     RETURN_IF_ERROR(_write_columns_by_jsonpath(_json_value, slot_descs, block, 
&valid));
diff --git a/regression-test/data/load_p0/stream_load/invalid_json_array.json 
b/regression-test/data/load_p0/stream_load/invalid_json_array.json
new file mode 100644
index 0000000000..7f1e5f1884
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/invalid_json_array.json
@@ -0,0 +1,11 @@
+ [   {"v6": "7395.231067", "k4": "1000", "k3": "100", "k2": "10", "k1": "1", 
"v3": "ynqnzeowymt", "v2": "t", "v1": "2010-01-01", "k5": "2011-01-01 
00:00:00", "v5": "180.998031", "v4": "38.638843"},
+    {"v6": "2080.504502", "k4": "2000", "k3": "200", "k2": "20", "k1": "2", 
"v3": "hfkfwlr", "v2": "f", "v1": "2010-01-02", "k5": "2012-01-01 00:00:00", 
"v5": "539.922834", "v4": "506.044046"},
+    {"v6": "4605.253205", "k4": "3000", "k3": "300", "k2": "30", "k1": "3", 
"v3": "uoclasp", "v2": "t", "v1": "2010-01-03", "k5": "2013-01-01 00:00:00", 
"v5": "577.044148", "v4": "377.793209"},
+    {"v6": "7291.703724", "k4": "4000", "k3": "400", "k2": "40", "k1": "4", 
"v3": "iswngzeodfhptjzgswsddt", "v2": "n", "v1": "2010-01-04", "k5": 
"2014-01-01 00:00:00", "v5": "919.067864", "v4": "871.354536"},
+    {"v6": "3903.906901", "k4": "5000", "k3": "500", "k2": "50", "k1": "5", 
"v3": "sqodagzlyrmcelyxgcgcsfuxadcdt", "v2": "a", "v1": "2010-01-05", "k5": 
"2015-01-01 00:00:00", "v5": "929.660783", "v4": "462.067902"},
+    {"v6": "6514.405051", "k4": "6000", "k3": "600", "k2": "60", "k1": "6", 
"v3": "obdrei", "v2": "m", "v1": "2010-01-06", "k5": "2016-01-01 00:00:00", 
"v5": "882.708491", "v4": "921.867848"},
+    {"v6": "8604.198677", "k4": "7000", "k3": "700", "k2": "70", "k1": "7", 
"v3": "cuobdhvrgkugknj", "v2": "a", "v1": "2010-01-07", "k5": "2017-01-01 
00:00:00", "v5": "209.420112", "v4": "141.656421"},
+    {"v6": "7784.859446", "k4": "8000", "k3": "800", "k2": "80", "k1": "8", 
"v3": "phcxztwgjllhmj", "v2": "z", "v1": "2010-01-08", "k5": "2018-01-01 
00:00:00", "v5": "285.664871", "v4": "762.813376"},
+    {"v6": "4846.735593", "k4": "9000", "k3": "900", "k2": "90", "k1": "9", 
"v3": "nbarqjwilbkelk", "v2": "b", "v1": "2010-01-09", "k5": "2019-01-01 
00:00:00", "v5": "535.285510", "v4": "92.702403"},
+    {"v6": "7996.434686", "k4": "10000", "k3": "1000", "k2": "100", "k1": 
"10", "v3": "zucprgdnlgzzfl", "v2": "s", "v1": "2010-01-10", "k5": "2020-01-01 
00:00:00", "v5": "155.861217", "v4": "26.874738"},]
+
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out 
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 79eed67a38..b6df264df1 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -118,3 +118,4 @@
 4      shenzhen        2345674
 200    changsha        3456789
 
+-- !select --
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index e0b190f2f8..1bca4f9687 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -63,6 +63,32 @@ suite("test_json_load", "p0") {
         assertTrue(result2[0].size() == 1)
         assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
     }
+
+    def test_invalid_json_array_table = { testTablex ->
+        // multi-line sql
+        def result1 = sql """
+                        CREATE TABLE IF NOT EXISTS ${testTable} (
+                            k1 TINYINT NOT NULL,
+                            k2 SMALLINT NOT NULL,
+                            k3 INT NOT NULL,
+                            k4 BIGINT NOT NULL,
+                            k5 DATETIME NOT NULL,
+                            v1 DATE REPLACE NOT NULL,
+                            v2 CHAR REPLACE NOT NULL,
+                            v3 VARCHAR(4096) REPLACE NOT NULL,
+                            v4 FLOAT SUM NOT NULL,
+                            v5 DOUBLE SUM NOT NULL,
+                            v6 DECIMAL(20,7) SUM NOT NULL
+                            ) AGGREGATE KEY(k1,k2,k3,k4,k5)
+                            DISTRIBUTED BY HASH(k1) BUCKETS 15
+                            PROPERTIES("replication_num" = "1");
+                        """
+
+        // DDL/DML return 1 row and 3 column, the only value is update row 
count
+        assertTrue(result1.size() == 1)
+        assertTrue(result1[0].size() == 1)
+        assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+    }
     
     def load_json_data = {strip_flag, read_flag, format_flag, exprs, 
json_paths, 
                             json_root, where_expr, fuzzy_flag, file_name, 
ignore_failure=false ->
@@ -324,7 +350,7 @@ suite("test_json_load", "p0") {
             sql "DROP TABLE IF EXISTS ${testTable}"
             
             create_test_table1.call(testTable)
-            
+
             def test_load_label = UUID.randomUUID().toString().replaceAll("-", 
"")
             load_from_hdfs2.call(testTable, test_load_label, hdfs_file_path, 
format,
                                 brokerName, hdfsUser, hdfsPasswd)
@@ -333,5 +359,19 @@ suite("test_json_load", "p0") {
         } finally {
             try_sql("DROP TABLE IF EXISTS ${testTable}")
         }
+
+        // case13: invalid json
+        try {
+            sql "DROP TABLE IF EXISTS ${testTable}"
+
+            test_invalid_json_array_table.call(testTable)
+            load_json_data.call('true', '', 'json', '', '',
+                    '', '', '', 'invalid_json_array.json', true)
+
+            qt_select "select * from ${testTable}"
+
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${testTable}")
+        }
     }
 }
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index c30f6cfeb7..71092f21f5 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -1287,10 +1287,11 @@ build_simdjson() {
     mkdir -p "${BUILD_DIR}"
     cd "${BUILD_DIR}"
 
-    CXXFLAGS="-O3" \
-        CFLAGS="-O3" \
-        "${CMAKE_CMD}" ..
-    "${CMAKE_CMD}" --build .
+    CXXFLAGS="-O3" CFLAGS="-O3" \
+        "${CMAKE_CMD}" -DSIMDJSON_EXCEPTIONS=OFF \
+        -DSIMDJSON_DEVELOPER_MODE=OFF -DSIMDJSON_BUILD_STATIC=ON \
+        -DSIMDJSON_JUST_LIBRARY=ON -DSIMDJSON_ENABLE_THREADS=ON ..
+    "${CMAKE_CMD}" --build . --config Release
 
     cp "${TP_SOURCE_DIR}/${SIMDJSON_SOURCE}/${BUILD_DIR}/libsimdjson.a" 
"${TP_INSTALL_DIR}/lib64"
     cp -r "${TP_SOURCE_DIR}/${SIMDJSON_SOURCE}/include"/* "${TP_INCLUDE_DIR}/"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to