xiaokang commented on code in PR #16903:
URL: https://github.com/apache/doris/pull/16903#discussion_r1111242055
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ return Status::DataQualityError(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ } while (false);
+
+ if (jsonpath.size() <= 1) {
+ // The first elem of json path should be '$'.
+ // A valid json path's size is >= 2.
+ return Status::DataQualityError("empty json path");
+ }
+
+ simdjson::ondemand::value tvalue;
+
+ // Skip the first $.
+ for (int i = 1; i < jsonpath.size(); i++) {
+ if (UNLIKELY(!jsonpath[i].is_valid)) {
+ return Status::DataQualityError(fmt::format("invalid json path:
{}", jsonpath[i].key));
+ }
+
+ const std::string& col = jsonpath[i].key;
+ int index = jsonpath[i].idx;
+
+ // Since the simdjson::ondemand::object cannot be converted to
simdjson::ondemand::value,
+ // we have to do some special treatment for the second elem of json
path.
+ // If the key is not found in json object, simdjson::NO_SUCH_FIELD
would be returned.
+ if (i == 1) {
+ HANDLE_SIMDJSON_ERROR(obj.find_field_unordered(col).get(tvalue),
+ fmt::format("unable to find field: {}",
col));
+ } else {
+ HANDLE_SIMDJSON_ERROR(tvalue.find_field_unordered(col).get(tvalue),
+ fmt::format("unable to find field: {}",
col));
+ }
+
+ if (index != -1) {
+ // try to access tvalue as array.
+ // If the index is beyond the length of array,
simdjson::INDEX_OUT_OF_BOUNDS would be returned.
+ simdjson::ondemand::array arr;
+ HANDLE_SIMDJSON_ERROR(tvalue.get_array().get(arr),
+ fmt::format("failed to access field as
array, field: {}", col));
+
+ HANDLE_SIMDJSON_ERROR(
+ arr.at(index).get(tvalue),
Review Comment:
Can simdjson properly process index == -2, which means * ?
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
Review Comment:
err: to simdjson err:
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
+ auto error = _ondemand_json_parser
+ ->iterate(std::string_view(reinterpret_cast<const
char*>(
+
_simdjson_ondemand_padding_buffer),
+ *size),
+ _padded_size)
+ .get(_original_json_doc);
+ auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+ const std::string& doc_info) -> Status {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ };
+ if (error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ error, simdjson::error_message(error));
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ try {
+ // set json root
+ // if it is an array at top level, then we should iterate the entire
array in
+ // ::_simdjson_handle_flat_array_complex_json
+ if (_parsed_json_root.size() != 0 &&
+ _original_json_doc.type() ==
simdjson::ondemand::json_type::object) {
Review Comment:
If _parsed_json_root.size() != 0 but _original_json_doc.type() is not
object, it seems to be an error.
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
Review Comment:
Is _json_value always an object? If not, is it valid to call get_object()?
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ return Status::DataQualityError(
\
Review Comment:
What's the other type of errors and the pricinple for DataQualityError?
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -112,6 +116,10 @@ Status NewJsonReader::init_reader() {
if (_is_dynamic_schema) {
_json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
}
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
Review Comment:
_ondemand_json_parser duplicated and _json_parser both use simdjson. Is it
necessary to have two?
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
Review Comment:
should process the case that size is greater than simd buffer.
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
Review Comment:
Why return OK here?
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
Review Comment:
value in array may not be an object.
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
+ auto error = _ondemand_json_parser
+ ->iterate(std::string_view(reinterpret_cast<const
char*>(
+
_simdjson_ondemand_padding_buffer),
+ *size),
+ _padded_size)
+ .get(_original_json_doc);
+ auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+ const std::string& doc_info) -> Status {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ };
+ if (error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ error, simdjson::error_message(error));
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ try {
+ // set json root
+ // if it is an array at top level, then we should iterate the entire
array in
+ // ::_simdjson_handle_flat_array_complex_json
+ if (_parsed_json_root.size() != 0 &&
+ _original_json_doc.type() ==
simdjson::ondemand::json_type::object) {
+ simdjson::ondemand::object object = _original_json_doc;
+ Status st = JsonFunctions::extract_from_object(object,
_parsed_json_root, &_json_value);
+ if (!st.ok()) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}", st.to_string());
+ return return_quality_error(error_msg,
std::string((char*)json_str, *size));
+ }
+ } else {
+ _json_value = _original_json_doc;
+ }
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Encounter error while extract_from_object,
error: {}", e.what());
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array &&
!_strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is array-object, `strip_outer_array` must be
TRUE.");
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+
+ if (_json_value.type() != simdjson::ondemand::json_type::array &&
_strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
+ simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>&
slot_descs,
+ std::vector<MutableColumnPtr>& columns, bool* valid) {
+ // write by jsonpath
+ size_t column_num = slot_descs.size();
+ bool has_valid_value = false;
+ size_t cur_row_count = columns[0]->size();
+ for (size_t i = 0; i < column_num; i++) {
+ auto* column_ptr = columns[i].get();
+ simdjson::ondemand::value json_value;
+ Status st;
+ if (i < _parsed_jsonpaths.size()) {
+ st = JsonFunctions::extract_from_object(*value,
_parsed_jsonpaths[i], &json_value);
+ if (!st.ok() && !st.is<NOT_FOUND>()) {
+ return st;
+ }
+ }
+ if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
+ // not match in jsondata.
+ if (!slot_descs[i]->is_nullable()) {
+ RETURN_IF_ERROR(_append_error_msg(
+ value, "The column `{}` is not nullable, but it's not
found in jsondata.",
+ slot_descs[i]->col_name(), valid));
+ return Status::OK();
+ }
+ } else {
+ RETURN_IF_ERROR(
+ _simdjson_write_data_to_column(json_value, slot_descs[i],
column_ptr, valid));
+ if (!(*valid)) {
Review Comment:
Why return OK if not valid?
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -112,6 +116,10 @@ Status NewJsonReader::init_reader() {
if (_is_dynamic_schema) {
_json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
}
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
Review Comment:
BTW, _simdjson_init_reader also created _ondemand_json_parser if
enable_simdjson_reader is true. So no mater enable_simdjson_reader is true or
false, _ondemand_json_parser is created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]