This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cb7e8bf1aa6 [fix](simd-json-reader) fix SIMD json reader lose data and
support stream parser (#35781)
cb7e8bf1aa6 is described below
commit cb7e8bf1aa619e2c3b8b10d84e0721e543bc092a
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jun 4 10:06:31 2024 +0800
[fix](simd-json-reader) fix SIMD json reader lose data and support stream
parser (#35781)
When load json with do not set read_json_by_line, only one json loaded.

But there are three rows json
{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
Two rows lost.
---
be/src/vec/exec/format/json/new_json_reader.cpp | 187 ++++++++++++---------
be/src/vec/exec/format/json/new_json_reader.h | 6 +-
.../load_p0/stream_load/iterate_read_json.json | 3 +
.../data/load_p0/stream_load/test_json_load.out | 5 +
.../load_p0/stream_load/test_json_load.groovy | 14 +-
5 files changed, 129 insertions(+), 86 deletions(-)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index b658e15e467..a765149384d 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1058,6 +1058,7 @@ Status
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
size_t size = 0;
simdjson::error_code error;
size_t num_rows = block.rows();
+
try {
// step1: get and parse buf to get json doc
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
@@ -1066,19 +1067,25 @@ Status
NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
return Status::OK();
}
- // step2: get json value by json doc
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row || *eof) {
- return Status::OK();
- }
+ for (_json_stream_iterator = _json_stream.begin();
+ _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
+ if (_json_stream_iterator.current_index() >= _original_doc_size) {
+ break;
+ }
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row || *eof) {
+ return Status::OK();
+ }
- // step 3: write columns by json value
- RETURN_IF_ERROR(
- _simdjson_handle_simple_json_write_columns(block, slot_descs,
is_empty_row, eof));
+ // step 3: write columns by json value
+ RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block,
slot_descs,
+
is_empty_row, eof));
+ }
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
@@ -1168,19 +1175,25 @@ Status
NewJsonReader::_simdjson_handle_flat_array_complex_json(
return Status::OK();
}
- // step2: get json value by json doc
- Status st = _get_json_value(&size, eof, &error, is_empty_row);
- if (st.is<DATA_QUALITY_ERROR>()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(st);
- if (*is_empty_row) {
- return Status::OK();
- }
+ for (_json_stream_iterator = _json_stream.begin();
+ _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
+ if (_json_stream_iterator.current_index() >= _original_doc_size) {
+ break;
+ }
+ // step2: get json value by json doc
+ Status st = _get_json_value(&size, eof, &error, is_empty_row);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row) {
+ return Status::OK();
+ }
- // step 3: write columns by json value
-
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block,
slot_descs,
-
is_empty_row, eof));
+ // step 3: write columns by json value
+
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(
+ block, slot_descs, is_empty_row, eof));
+ }
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
@@ -1256,20 +1269,26 @@ Status
NewJsonReader::_simdjson_handle_nested_complex_json(
RuntimeState* /*state*/, Block& block, const
std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// nested complex json
- while (true) {
- size_t num_rows = block.rows();
- simdjson::ondemand::object cur;
- size_t size = 0;
- simdjson::error_code error;
- try {
- RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
- if (size == 0 || *eof) {
- *is_empty_row = true;
- return Status::OK();
+ size_t num_rows = block.rows();
+ simdjson::ondemand::object cur;
+ size_t size = 0;
+ simdjson::error_code error;
+
+ try {
+ RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof,
&error));
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ for (_json_stream_iterator = _json_stream.begin();
+ _json_stream_iterator != _json_stream.end();
++_json_stream_iterator) {
+ if (_json_stream_iterator.current_index() >= _original_doc_size) {
+ break;
}
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
- continue; // continue to read next
+ return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
@@ -1299,16 +1318,14 @@ Status
NewJsonReader::_simdjson_handle_nested_complex_json(
// so that the caller will continue reading next line.
*is_empty_row = true;
}
- break; // read a valid row
- } catch (simdjson::simdjson_error& e) {
- RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
- if (*_scanner_eof) {
- // When _scanner_eof is true and valid is false, it means that
we have encountered
- // unqualified data and decided to stop the scan.
- *is_empty_row = true;
- return Status::OK();
- }
- continue;
+ }
+ } catch (simdjson::simdjson_error& e) {
+ RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that we
have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
}
}
return Status::OK();
@@ -1515,14 +1532,13 @@ Status NewJsonReader::_simdjson_parse_json(size_t*
size, bool* is_empty_row, boo
return Status::OK();
}
- // step2: init json parser iterate.
- if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
- // For efficiency reasons, simdjson requires a string with a few bytes
(simdjson::SIMDJSON_PADDING) at the end.
- // Hence, a re-allocation is needed if the space is not enough.
- _simdjson_ondemand_padding_buffer.resize(*size +
simdjson::SIMDJSON_PADDING);
- _simdjson_ondemand_unscape_padding_buffer.resize(*size +
simdjson::SIMDJSON_PADDING);
- _padded_size = *size + simdjson::SIMDJSON_PADDING;
- }
+ // step2: init json stream iterate.
+ // For efficiency reasons, simdjson requires a string with a few bytes
(simdjson::SIMDJSON_PADDING) at the end.
+ _simdjson_ondemand_padding_buffer.clear();
+ _padded_size = *size + simdjson::SIMDJSON_PADDING;
+ _simdjson_ondemand_padding_buffer.resize(_padded_size);
+ _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
+
// trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
static_cast<char>(_json_str[1]) == '\xBB' &&
static_cast<char>(_json_str[2]) == '\xBF') {
@@ -1532,10 +1548,16 @@ Status NewJsonReader::_simdjson_parse_json(size_t*
size, bool* is_empty_row, boo
}
memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
_original_doc_size = *size;
- *error = _ondemand_json_parser
-
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
- _padded_size)
- .get(_original_json_doc);
+
+ *error =
_ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer)
+ .get(_json_stream);
+ if (*error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ *error, simdjson::error_message(*error));
+ return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
+ }
+
return Status::OK();
}
@@ -1557,43 +1579,40 @@ Status NewJsonReader::_judge_empty_row(size_t size,
bool eof, bool* is_empty_row
return Status::OK();
}
+Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg,
+ const std::string& doc_info, bool*
eof) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+}
+
Status NewJsonReader::_get_json_value(size_t* size, bool* eof,
simdjson::error_code* error,
bool* is_empty_row) {
SCOPED_TIMER(_file_read_timer);
- auto return_quality_error = [&](fmt::memory_buffer& error_msg,
- const std::string& doc_info) -> Status {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string { return doc_info; },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
- if (*_scanner_eof) {
- // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
- // we meet enough invalid rows and the scanner should be stopped.
- // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
- *eof = true;
- return Status::OK();
- }
- return Status::DataQualityError(fmt::to_string(error_msg));
- };
- if (*error != simdjson::error_code::SUCCESS) {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
- *error, simdjson::error_message(*error));
- return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
- }
+ _original_json_doc = (*_json_stream_iterator).value();
+
auto type_res = _original_json_doc.type();
if (type_res.error() != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
type_res.error(),
simdjson::error_message(type_res.error()));
- return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
+ return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
}
simdjson::ondemand::json_type type = type_res.value();
if (type != simdjson::ondemand::json_type::object &&
type != simdjson::ondemand::json_type::array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Not an json object or json array");
- return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
+ return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
}
if (!_parsed_json_root.empty() && type ==
simdjson::ondemand::json_type::object) {
try {
@@ -1605,13 +1624,13 @@ Status NewJsonReader::_get_json_value(size_t* size,
bool* eof, simdjson::error_c
if (!st.ok()) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", st.to_string());
- return return_quality_error(error_msg,
std::string((char*)_json_str, *size));
+ return _return_quality_error(error_msg,
std::string((char*)_json_str, *size), eof);
}
} catch (simdjson::simdjson_error& e) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Encounter error while
extract_from_object, error: {}",
e.what());
- return return_quality_error(error_msg,
std::string((char*)_json_str, *size));
+ return _return_quality_error(error_msg,
std::string((char*)_json_str, *size), eof);
}
} else {
_json_value = _original_json_doc;
@@ -1621,14 +1640,14 @@ Status NewJsonReader::_get_json_value(size_t* size,
bool* eof, simdjson::error_c
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be
TRUE.");
- return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
+ return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
}
if (_json_value.type() != simdjson::ondemand::json_type::array &&
_strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
- return return_quality_error(error_msg, std::string((char*)_json_str,
*size));
+ return _return_quality_error(error_msg, std::string((char*)_json_str,
*size), eof);
}
RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
return Status::OK();
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index f044e06e62e..2a8b428db9f 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -147,6 +147,8 @@ private:
Status _simdjson_init_reader();
Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
simdjson::error_code* error);
+ Status _return_quality_error(fmt::memory_buffer& error_msg, const
std::string& doc_info,
+ bool* eof);
Status _get_json_value(size_t* size, bool* eof, simdjson::error_code*
error,
bool* is_empty_row);
Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
@@ -273,8 +275,10 @@ private:
std::string _simdjson_ondemand_padding_buffer;
std::string _simdjson_ondemand_unscape_padding_buffer;
// char _simdjson_ondemand_padding_buffer[_padded_size];
- simdjson::ondemand::document _original_json_doc;
+ simdjson::ondemand::document_reference _original_json_doc;
simdjson::ondemand::value _json_value;
+ simdjson::ondemand::document_stream _json_stream;
+ simdjson::ondemand::document_stream::iterator _json_stream_iterator;
// for strip outer array
// array_iter pointed to _array
simdjson::ondemand::array_iterator _array_iter;
diff --git a/regression-test/data/load_p0/stream_load/iterate_read_json.json
b/regression-test/data/load_p0/stream_load/iterate_read_json.json
new file mode 100644
index 00000000000..f6c0d634efe
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/iterate_read_json.json
@@ -0,0 +1,3 @@
+{"name":"Name1","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
+{"name":"Name2","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
+{"name":"Name3","age":21,"agent_id":"5fbfefd2-ea1c-44fd-bc54-6eb2582e1525"}
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out
b/regression-test/data/load_p0/stream_load/test_json_load.out
index 9f064572f13..4c5dfc81f87 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -245,6 +245,11 @@ John 30 New York
{"email":"[email protected]","phone":"+1-123-456-7890"}
android \N \N \N \N \N
android \N \N \N \N \N
+-- !iterate_read_json --
+Name1 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+Name2 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+Name3 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
+
-- !select28 --
test k2_value
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index da6bea1afcd..8971b27f662 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -787,7 +787,6 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}
-
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
@@ -823,7 +822,20 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+
+ // iterate read json when read_json_by_line = false
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_json_test_table.call(testTable)
+ def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
+ load_json_data.call("${testTable}", test_load_label, 'false', 'false',
'json', '', '', '', '', '', 'iterate_read_json.json')
+ sql "sync"
+
+ qt_iterate_read_json "select * from ${testTable} order by name"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]