This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ca83531fbe8 Revert "[fix](simd-json-reader) fix SIMD json reader lose 
data and support stream parser (#35781) (#36043)
ca83531fbe8 is described below

commit ca83531fbe8ec9fb37b4c8a7b316520699fe378e
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 7 23:06:58 2024 +0800

    Revert "[fix](simd-json-reader) fix SIMD json reader lose data and support 
stream parser (#35781) (#36043)
    
    #35781
    
    This reverts commit cb7e8bf1aa619e2c3b8b10d84e0721e543bc092a.
---
 be/src/vec/exec/format/json/new_json_reader.cpp    | 187 +++++++++------------
 be/src/vec/exec/format/json/new_json_reader.h      |   6 +-
 .../load_p0/stream_load/iterate_read_json.json     |   3 -
 .../data/load_p0/stream_load/test_json_load.out    |   5 -
 .../load_p0/stream_load/test_json_load.groovy      |  14 +-
 5 files changed, 86 insertions(+), 129 deletions(-)

diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index a765149384d..b658e15e467 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1058,7 +1058,6 @@ Status 
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
     size_t size = 0;
     simdjson::error_code error;
     size_t num_rows = block.rows();
-
     try {
         // step1: get and parse buf to get json doc
         RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
@@ -1067,25 +1066,19 @@ Status 
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
             return Status::OK();
         }
 
-        for (_json_stream_iterator = _json_stream.begin();
-             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
-            if (_json_stream_iterator.current_index() >= _original_doc_size) {
-                break;
-            }
-            // step2: get json value by json doc
-            Status st = _get_json_value(&size, eof, &error, is_empty_row);
-            if (st.is<DATA_QUALITY_ERROR>()) {
-                return Status::OK();
-            }
-            RETURN_IF_ERROR(st);
-            if (*is_empty_row || *eof) {
-                return Status::OK();
-            }
-
-            // step 3: write columns by json value
-            RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block, 
slot_descs,
-                                                                       
is_empty_row, eof));
+        // step2: get json value by json doc
+        Status st = _get_json_value(&size, eof, &error, is_empty_row);
+        if (st.is<DATA_QUALITY_ERROR>()) {
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row || *eof) {
+            return Status::OK();
         }
+
+        // step 3: write columns by json value
+        RETURN_IF_ERROR(
+                _simdjson_handle_simple_json_write_columns(block, slot_descs, 
is_empty_row, eof));
     } catch (simdjson::simdjson_error& e) {
         RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
         if (*_scanner_eof) {
@@ -1175,25 +1168,19 @@ Status 
NewJsonReader::_simdjson_handle_flat_array_complex_json(
             return Status::OK();
         }
 
-        for (_json_stream_iterator = _json_stream.begin();
-             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
-            if (_json_stream_iterator.current_index() >= _original_doc_size) {
-                break;
-            }
-            // step2: get json value by json doc
-            Status st = _get_json_value(&size, eof, &error, is_empty_row);
-            if (st.is<DATA_QUALITY_ERROR>()) {
-                return Status::OK();
-            }
-            RETURN_IF_ERROR(st);
-            if (*is_empty_row) {
-                return Status::OK();
-            }
-
-            // step 3: write columns by json value
-            
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(
-                    block, slot_descs, is_empty_row, eof));
+        // step2: get json value by json doc
+        Status st = _get_json_value(&size, eof, &error, is_empty_row);
+        if (st.is<DATA_QUALITY_ERROR>()) {
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(st);
+        if (*is_empty_row) {
+            return Status::OK();
         }
+
+        // step 3: write columns by json value
+        
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, 
slot_descs,
+                                                                               
is_empty_row, eof));
     } catch (simdjson::simdjson_error& e) {
         RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
         if (*_scanner_eof) {
@@ -1269,26 +1256,20 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
         RuntimeState* /*state*/, Block& block, const 
std::vector<SlotDescriptor*>& slot_descs,
         bool* is_empty_row, bool* eof) {
     // nested complex json
-    size_t num_rows = block.rows();
-    simdjson::ondemand::object cur;
-    size_t size = 0;
-    simdjson::error_code error;
-
-    try {
-        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
-        if (size == 0 || *eof) {
-            *is_empty_row = true;
-            return Status::OK();
-        }
-
-        for (_json_stream_iterator = _json_stream.begin();
-             _json_stream_iterator != _json_stream.end(); 
++_json_stream_iterator) {
-            if (_json_stream_iterator.current_index() >= _original_doc_size) {
-                break;
+    while (true) {
+        size_t num_rows = block.rows();
+        simdjson::ondemand::object cur;
+        size_t size = 0;
+        simdjson::error_code error;
+        try {
+            RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, 
&error));
+            if (size == 0 || *eof) {
+                *is_empty_row = true;
+                return Status::OK();
             }
             Status st = _get_json_value(&size, eof, &error, is_empty_row);
             if (st.is<DATA_QUALITY_ERROR>()) {
-                return Status::OK();
+                continue; // continue to read next
             }
             RETURN_IF_ERROR(st);
             if (*is_empty_row) {
@@ -1318,14 +1299,16 @@ Status 
NewJsonReader::_simdjson_handle_nested_complex_json(
                 // so that the caller will continue reading next line.
                 *is_empty_row = true;
             }
-        }
-    } catch (simdjson::simdjson_error& e) {
-        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
-        if (*_scanner_eof) {
-            // When _scanner_eof is true and valid is false, it means that we 
have encountered
-            // unqualified data and decided to stop the scan.
-            *is_empty_row = true;
-            return Status::OK();
+            break; // read a valid row
+        } catch (simdjson::simdjson_error& e) {
+            RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+            if (*_scanner_eof) {
+                // When _scanner_eof is true and valid is false, it means that 
we have encountered
+                // unqualified data and decided to stop the scan.
+                *is_empty_row = true;
+                return Status::OK();
+            }
+            continue;
         }
     }
     return Status::OK();
@@ -1532,13 +1515,14 @@ Status NewJsonReader::_simdjson_parse_json(size_t* 
size, bool* is_empty_row, boo
         return Status::OK();
     }
 
-    // step2: init json stream iterate.
-    // For efficiency reasons, simdjson requires a string with a few bytes 
(simdjson::SIMDJSON_PADDING) at the end.
-    _simdjson_ondemand_padding_buffer.clear();
-    _padded_size = *size + simdjson::SIMDJSON_PADDING;
-    _simdjson_ondemand_padding_buffer.resize(_padded_size);
-    _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
-
+    // step2: init json parser iterate.
+    if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
+        // For efficiency reasons, simdjson requires a string with a few bytes 
(simdjson::SIMDJSON_PADDING) at the end.
+        // Hence, a re-allocation is needed if the space is not enough.
+        _simdjson_ondemand_padding_buffer.resize(*size + 
simdjson::SIMDJSON_PADDING);
+        _simdjson_ondemand_unscape_padding_buffer.resize(*size + 
simdjson::SIMDJSON_PADDING);
+        _padded_size = *size + simdjson::SIMDJSON_PADDING;
+    }
     // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
     if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
         static_cast<char>(_json_str[1]) == '\xBB' && 
static_cast<char>(_json_str[2]) == '\xBF') {
@@ -1548,16 +1532,10 @@ Status NewJsonReader::_simdjson_parse_json(size_t* 
size, bool* is_empty_row, boo
     }
     memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
     _original_doc_size = *size;
-
-    *error = 
_ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer)
-                     .get(_json_stream);
-    if (*error != simdjson::error_code::SUCCESS) {
-        fmt::memory_buffer error_msg;
-        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
-                       *error, simdjson::error_message(*error));
-        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
-    }
-
+    *error = _ondemand_json_parser
+                     
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
+                               _padded_size)
+                     .get(_original_json_doc);
     return Status::OK();
 }
 
@@ -1579,40 +1557,43 @@ Status NewJsonReader::_judge_empty_row(size_t size, 
bool eof, bool* is_empty_row
     return Status::OK();
 }
 
-Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg,
-                                            const std::string& doc_info, bool* 
eof) {
-    RETURN_IF_ERROR(_state->append_error_msg_to_file(
-            [&]() -> std::string { return doc_info; },
-            [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
-    _counter->num_rows_filtered++;
-    if (*_scanner_eof) {
-        // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
-        // we meet enough invalid rows and the scanner should be stopped.
-        // So we set eof to true and return OK, the caller will stop the 
process as we meet the end of file.
-        *eof = true;
-        return Status::OK();
-    }
-    return Status::DataQualityError(fmt::to_string(error_msg));
-}
-
 Status NewJsonReader::_get_json_value(size_t* size, bool* eof, 
simdjson::error_code* error,
                                       bool* is_empty_row) {
     SCOPED_TIMER(_file_read_timer);
-    _original_json_doc = (*_json_stream_iterator).value();
-
+    auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+                                    const std::string& doc_info) -> Status {
+        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                [&]() -> std::string { return doc_info; },
+                [&]() -> std::string { return fmt::to_string(error_msg); }, 
_scanner_eof));
+        _counter->num_rows_filtered++;
+        if (*_scanner_eof) {
+            // Case A: if _scanner_eof is set to true in 
"append_error_msg_to_file", which means
+            // we meet enough invalid rows and the scanner should be stopped.
+            // So we set eof to true and return OK, the caller will stop the 
process as we meet the end of file.
+            *eof = true;
+            return Status::OK();
+        }
+        return Status::DataQualityError(fmt::to_string(error_msg));
+    };
+    if (*error != simdjson::error_code::SUCCESS) {
+        fmt::memory_buffer error_msg;
+        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
+                       *error, simdjson::error_message(*error));
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
+    }
     auto type_res = _original_json_doc.type();
     if (type_res.error() != simdjson::error_code::SUCCESS) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: 
{}, error info: {}",
                        type_res.error(), 
simdjson::error_message(type_res.error()));
-        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     simdjson::ondemand::json_type type = type_res.value();
     if (type != simdjson::ondemand::json_type::object &&
         type != simdjson::ondemand::json_type::array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "Not an json object or json array");
-        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     if (!_parsed_json_root.empty() && type == 
simdjson::ondemand::json_type::object) {
         try {
@@ -1624,13 +1605,13 @@ Status NewJsonReader::_get_json_value(size_t* size, 
bool* eof, simdjson::error_c
             if (!st.ok()) {
                 fmt::memory_buffer error_msg;
                 fmt::format_to(error_msg, "{}", st.to_string());
-                return _return_quality_error(error_msg, 
std::string((char*)_json_str, *size), eof);
+                return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
             }
         } catch (simdjson::simdjson_error& e) {
             fmt::memory_buffer error_msg;
             fmt::format_to(error_msg, "Encounter error while 
extract_from_object, error: {}",
                            e.what());
-            return _return_quality_error(error_msg, 
std::string((char*)_json_str, *size), eof);
+            return return_quality_error(error_msg, 
std::string((char*)_json_str, *size));
         }
     } else {
         _json_value = _original_json_doc;
@@ -1640,14 +1621,14 @@ Status NewJsonReader::_get_json_value(size_t* size, 
bool* eof, simdjson::error_c
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is array-object, `strip_outer_array` must be 
TRUE.");
-        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
 
     if (_json_value.type() != simdjson::ondemand::json_type::array && 
_strip_outer_array) {
         fmt::memory_buffer error_msg;
         fmt::format_to(error_msg, "{}",
                        "JSON data is not an array-object, `strip_outer_array` 
must be FALSE.");
-        return _return_quality_error(error_msg, std::string((char*)_json_str, 
*size), eof);
+        return return_quality_error(error_msg, std::string((char*)_json_str, 
*size));
     }
     RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
     return Status::OK();
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 2a8b428db9f..f044e06e62e 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,8 +147,6 @@ private:
     Status _simdjson_init_reader();
     Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
                                 simdjson::error_code* error);
-    Status _return_quality_error(fmt::memory_buffer& error_msg, const 
std::string& doc_info,
-                                 bool* eof);
     Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* 
error,
                            bool* is_empty_row);
     Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
@@ -275,10 +273,8 @@ private:
     std::string _simdjson_ondemand_padding_buffer;
     std::string _simdjson_ondemand_unscape_padding_buffer;
     // char _simdjson_ondemand_padding_buffer[_padded_size];
-    simdjson::ondemand::document_reference _original_json_doc;
+    simdjson::ondemand::document _original_json_doc;
     simdjson::ondemand::value _json_value;
-    simdjson::ondemand::document_stream _json_stream;
-    simdjson::ondemand::document_stream::iterator _json_stream_iterator;
     // for strip outer array
     // array_iter pointed to _array
     simdjson::ondemand::array_iterator _array_iter;
diff --git a/regression-test/data/load_p0/stream_load/iterate_read_json.json 
b/regression-test/data/load_p0/stream_load/iterate_read_json.json
deleted file mode 100644
index f6c0d634efe..00000000000
--- a/regression-test/data/load_p0/stream_load/iterate_read_json.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
-{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
-{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out 
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 4c5dfc81f87..9f064572f13 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -245,11 +245,6 @@ John       30      New York        
{"email":"[email protected]","phone":"+1-123-456-7890"}
 android        \N      \N      \N      \N      \N
 android        \N      \N      \N      \N      \N
 
--- !iterate_read_json --
-Name1  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-Name2  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-Name3  21      5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-
 -- !select28 --
 test   k2_value
 
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 8971b27f662..da6bea1afcd 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -787,6 +787,7 @@ suite("test_json_load", "p0") {
                 assertEquals("${reason}", "${out}")
             }
         }
+
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
@@ -822,20 +823,7 @@ suite("test_json_load", "p0") {
                 assertEquals("${reason}", "${out}")
             }
         }
-    } finally {
-        try_sql("DROP TABLE IF EXISTS ${testTable}")
-    }
-      
-    // iterate read json when read_json_by_line = false
-    try {
-        sql "DROP TABLE IF EXISTS ${testTable}"
 
-        create_json_test_table.call(testTable)
-        def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
-        load_json_data.call("${testTable}", test_load_label, 'false', 'false', 
'json', '', '', '', '', '', 'iterate_read_json.json')
-        sql "sync" 
-        
-        qt_iterate_read_json "select * from ${testTable} order by name"
     } finally {
         try_sql("DROP TABLE IF EXISTS ${testTable}")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to