This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ca83531fbe8 Revert "[fix](simd-json-reader) fix SIMD json reader lose
data and support stream parser (#35781) (#36043)
ca83531fbe8 is described below
commit ca83531fbe8ec9fb37b4c8a7b316520699fe378e
Author: hui lai <[email protected]>
AuthorDate: Fri Jun 7 23:06:58 2024 +0800
Revert "[fix](simd-json-reader) fix SIMD json reader lose data and support
stream parser (#35781) (#36043)
#35781
This reverts commit cb7e8bf1aa619e2c3b8b10d84e0721e543bc092a.
---
be/src/vec/exec/format/json/new_json_reader.cpp | 187 +++++++++------------
be/src/vec/exec/format/json/new_json_reader.h | 6 +-
.../load_p0/stream_load/iterate_read_json.json | 3 -
.../data/load_p0/stream_load/test_json_load.out | 5 -
.../load_p0/stream_load/test_json_load.groovy | 14 +-
5 files changed, 86 insertions(+), 129 deletions(-)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index a765149384d..b658e15e467 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1058,7 +1058,6 @@ Status
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
size_t size = 0;
simdjson::error_code error;
size_t num_rows = block.rows();
-
try {
// step1: get and parse buf to get json doc
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
@@ -1067,25 +1066,19 @@ Status
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
return Status::OK();
}
- for (_json_stream_iterator = _json_stream.begin();
- _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
- if (_json_stream_iterator.current_index() >= _original_doc_size) {
- break;
- }
- // step2: get json value by json doc
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row || *eof) {
- return Status::OK();
- }
-
- // step 3: write columns by json value
- RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block,
slot_descs,
-
is_empty_row, eof));
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row || *eof) {
+ return Status::OK();
}
+
+ // step 3: write columns by json value
+ RETURN_IF_ERROR(
+ _simdjson_handle_simple_json_write_columns(block, slot_descs,
is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
@@ -1175,25 +1168,19 @@ Status
NewJsonReader::_simdjson_handle_flat_array_complex_json(
return Status::OK();
}
- for (_json_stream_iterator = _json_stream.begin();
- _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
- if (_json_stream_iterator.current_index() >= _original_doc_size) {
- break;
- }
- // step2: get json value by json doc
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row) {
- return Status::OK();
- }
-
- // step 3: write columns by json value
-
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(
- block, slot_descs, is_empty_row, eof));
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row) {
+ return Status::OK();
}
+
+ // step 3: write columns by json value
+
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block,
slot_descs,
+
is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
@@ -1269,26 +1256,20 @@ Status
NewJsonReader::_simdjson_handle_nested_complex_json(
RuntimeState* /*state*/, Block& block, const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// nested complex json
- size_t num_rows = block.rows();
- simdjson::ondemand::object cur;
- size_t size = 0;
- simdjson::error_code error;
-
- try {
- RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
- if (size == 0 || *eof) {
- *is_empty_row = true;
- return Status::OK();
- }
-
- for (_json_stream_iterator = _json_stream.begin();
- _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
- if (_json_stream_iterator.current_index() >= _original_doc_size) {
- break;
+ while (true) {
+ size_t num_rows = block.rows();
+ simdjson::ondemand::object cur;
+ size_t size = 0;
+ simdjson::error_code error;
+ try {
+ RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
}
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
- return Status::OK();
+ continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
@@ -1318,14 +1299,16 @@ Status
NewJsonReader::_simdjson_handle_nested_complex_json(
// so that the caller will continue reading next line.
*is_empty_row = true;
}
- }
- } catch (simdjson::simdjson_error& e) {
- RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
- if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means that we
have encountered
- // unqualified data and decided to stop the scan.
- *is_empty_row = true;
- return Status::OK();
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
}
}
return Status::OK();
@@ -1532,13 +1515,14 @@ Status NewJsonReader::_simdjson_parse_json(size_t*
size, bool* is_empty_row, boo
return Status::OK();
}
- // step2: init json stream iterate.
- // For efficiency reasons, simdjson requires a string with a few bytes
(simdjson::SIMDJSON_PADDING) at the end.
- _simdjson_ondemand_padding_buffer.clear();
- _padded_size = *size + simdjson::SIMDJSON_PADDING;
- _simdjson_ondemand_padding_buffer.resize(_padded_size);
- _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
-
+ // step2: init json parser iterate.
+ if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
+ // For efficiency reasons, simdjson requires a string with a few bytes
(simdjson::SIMDJSON_PADDING) at the end.
+ // Hence, a re-allocation is needed if the space is not enough.
+ _simdjson_ondemand_padding_buffer.resize(*size +
simdjson::SIMDJSON_PADDING);
+ _simdjson_ondemand_unscape_padding_buffer.resize(*size +
simdjson::SIMDJSON_PADDING);
+ _padded_size = *size + simdjson::SIMDJSON_PADDING;
+ }
// trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
static_cast<char>(_json_str[1]) == '\xBB' &&
static_cast<char>(_json_str[2]) == '\xBF') {
@@ -1548,16 +1532,10 @@ Status NewJsonReader::_simdjson_parse_json(size_t*
size, bool* is_empty_row, boo
}
memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
_original_doc_size = *size;
-
- *error =
_ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer)
- .get(_json_stream);
- if (*error != simdjson::error_code::SUCCESS) {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
- *error, simdjson::error_message(*error));
- return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
- }
-
+ *error = _ondemand_json_parser
+
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
+ _padded_size)
+ .get(_original_json_doc);
return Status::OK();
}
@@ -1579,40 +1557,43 @@ Status NewJsonReader::_judge_empty_row(size_t size,
bool eof, bool* is_empty_row
return Status::OK();
}
-Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg,
- const std::string& doc_info, bool*
eof) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string { return doc_info; },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
- if (*_scanner_eof) {
- // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
- // we meet enough invalid rows and the scanner should be stopped.
- // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
- *eof = true;
- return Status::OK();
- }
- return Status::DataQualityError(fmt::to_string(error_msg));
-}
-
Status NewJsonReader::_get_json_value(size_t* size, bool* eof,
simdjson::error_code* error,
bool* is_empty_row) {
SCOPED_TIMER(_file_read_timer);
- _original_json_doc = (*_json_stream_iterator).value();
-
+ auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+ const std::string& doc_info) -> Status {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ };
+ if (*error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ *error, simdjson::error_message(*error));
+ return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
+ }
auto type_res = _original_json_doc.type();
if (type_res.error() != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
type_res.error(),
simdjson::error_message(type_res.error()));
- return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
+ return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
}
simdjson::ondemand::json_type type = type_res.value();
if (type != simdjson::ondemand::json_type::object &&
type != simdjson::ondemand::json_type::array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Not an json object or json array");
- return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
+ return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
}
if (!_parsed_json_root.empty() && type ==
simdjson::ondemand::json_type::object) {
try {
@@ -1624,13 +1605,13 @@ Status NewJsonReader::_get_json_value(size_t* size,
bool* eof, simdjson::error_c
if (!st.ok()) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", st.to_string());
- return _return_quality_error(error_msg,
std::string((char*)_json_str, *size), eof);
+ return return_quality_error(error_msg,
std::string((char*)_json_str, *size));
}
} catch (simdjson::simdjson_error& e) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Encounter error while
extract_from_object, error: {}",
e.what());
- return _return_quality_error(error_msg,
std::string((char*)_json_str, *size), eof);
+ return return_quality_error(error_msg,
std::string((char*)_json_str, *size));
}
} else {
_json_value = _original_json_doc;
@@ -1640,14 +1621,14 @@ Status NewJsonReader::_get_json_value(size_t* size,
bool* eof, simdjson::error_c
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be
TRUE.");
- return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
+ return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
}
if (_json_value.type() != simdjson::ondemand::json_type::array &&
_strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
- return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
+ return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
}
RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
return Status::OK();
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 2a8b428db9f..f044e06e62e 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,8 +147,6 @@ private:
Status _simdjson_init_reader();
Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
simdjson::error_code* error);
- Status _return_quality_error(fmt::memory_buffer& error_msg, const
std::string& doc_info,
- bool* eof);
Status _get_json_value(size_t* size, bool* eof, simdjson::error_code*
error,
bool* is_empty_row);
Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
@@ -275,10 +273,8 @@ private:
std::string _simdjson_ondemand_padding_buffer;
std::string _simdjson_ondemand_unscape_padding_buffer;
// char _simdjson_ondemand_padding_buffer[_padded_size];
- simdjson::ondemand::document_reference _original_json_doc;
+ simdjson::ondemand::document _original_json_doc;
simdjson::ondemand::value _json_value;
- simdjson::ondemand::document_stream _json_stream;
- simdjson::ondemand::document_stream::iterator _json_stream_iterator;
// for strip outer array
// array_iter pointed to _array
simdjson::ondemand::array_iterator _array_iter;
diff --git a/regression-test/data/load_p0/stream_load/iterate_read_json.json
b/regression-test/data/load_p0/stream_load/iterate_read_json.json
deleted file mode 100644
index f6c0d634efe..00000000000
--- a/regression-test/data/load_p0/stream_load/iterate_read_json.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
-{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
-{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 4c5dfc81f87..9f064572f13 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -245,11 +245,6 @@ John 30 New York
{"email":"[email protected]","phone":"+1-123-456-7890"}
android \N \N \N \N \N
android \N \N \N \N \N
--- !iterate_read_json --
-Name1 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-Name2 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-Name3 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
-
-- !select28 --
test k2_value
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 8971b27f662..da6bea1afcd 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -787,6 +787,7 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}
+
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
@@ -822,20 +823,7 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}
- } finally {
- try_sql("DROP TABLE IF EXISTS ${testTable}")
- }
-
- // iterate read json when read_json_by_line = false
- try {
- sql "DROP TABLE IF EXISTS ${testTable}"
- create_json_test_table.call(testTable)
- def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
- load_json_data.call("${testTable}", test_load_label, 'false', 'false',
'json', '', '', '', '', '', 'iterate_read_json.json')
- sql "sync"
-
- qt_iterate_read_json "select * from ${testTable} order by name"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]