eldenmoon commented on code in PR #16903:
URL: https://github.com/apache/doris/pull/16903#discussion_r1111253231
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -112,6 +116,10 @@ Status NewJsonReader::init_reader() {
if (_is_dynamic_schema) {
_json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
}
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
Review Comment:
yeah i will remove _ondemand_json_parser init while enable_simdjson_reader
is false
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -112,6 +116,10 @@ Status NewJsonReader::init_reader() {
if (_is_dynamic_schema) {
_json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
}
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
Review Comment:
_json_parser is using dom frontend parse, but _ondemand_json_parser is using
ondemand frontend parser they are diferent
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
Review Comment:
it will throw expection
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ return Status::DataQualityError(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ } while (false);
+
+ if (jsonpath.size() <= 1) {
+ // The first elem of json path should be '$'.
+ // A valid json path's size is >= 2.
+ return Status::DataQualityError("empty json path");
+ }
+
+ simdjson::ondemand::value tvalue;
+
+ // Skip the first $.
+ for (int i = 1; i < jsonpath.size(); i++) {
+ if (UNLIKELY(!jsonpath[i].is_valid)) {
+ return Status::DataQualityError(fmt::format("invalid json path:
{}", jsonpath[i].key));
+ }
+
+ const std::string& col = jsonpath[i].key;
+ int index = jsonpath[i].idx;
+
+ // Since the simdjson::ondemand::object cannot be converted to
simdjson::ondemand::value,
+ // we have to do some special treatment for the second elem of json
path.
+ // If the key is not found in json object, simdjson::NO_SUCH_FIELD
would be returned.
+ if (i == 1) {
+ HANDLE_SIMDJSON_ERROR(obj.find_field_unordered(col).get(tvalue),
+ fmt::format("unable to find field: {}",
col));
+ } else {
+ HANDLE_SIMDJSON_ERROR(tvalue.find_field_unordered(col).get(tvalue),
+ fmt::format("unable to find field: {}",
col));
+ }
+
+ if (index != -1) {
+ // try to access tvalue as array.
+ // If the index is beyond the length of array,
simdjson::INDEX_OUT_OF_BOUNDS would be returned.
+ simdjson::ondemand::array arr;
+ HANDLE_SIMDJSON_ERROR(tvalue.get_array().get(arr),
+ fmt::format("failed to access field as
array, field: {}", col));
+
+ HANDLE_SIMDJSON_ERROR(
+ arr.at(index).get(tvalue),
Review Comment:
no index == -2 is not supported in this function for now
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
Review Comment:
right, i'm handling it
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
+ }
\
+ return Status::DataQualityError(
\
Review Comment:
if path was not found in the object or out_of_bond error enounterd, both
indicate the object does not contain the related value according to
jsonpath.Otherwise will return DataQualityError since it maybe malformed json
##########
be/src/vec/exec/format/json/new_json_reader.h:
##########
@@ -163,7 +194,21 @@ class NewJsonReader : public GenericReader {
RuntimeProfile::Counter* _file_read_timer;
bool _is_dynamic_schema = false;
+ // name mapping
+ phmap::flat_hash_map<String, size_t> _slot_desc_index;
+ // simdjson
+ static constexpr size_t _buffer_size = 1024 * 1024 * 8;
Review Comment:
Size maybe over flow
##########
be/src/exprs/json_functions.cpp:
##########
@@ -282,4 +282,67 @@ void JsonFunctions::get_parsed_paths(const
std::vector<std::string>& path_exprs,
}
}
+Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
+ const std::vector<JsonPath>&
jsonpath,
+ simdjson::ondemand::value* value)
noexcept {
+#define HANDLE_SIMDJSON_ERROR(err, msg)
\
+ do {
\
+ const simdjson::error_code& _err = err;
\
+ const std::string& _msg = msg;
\
+ if (UNLIKELY(_err)) {
\
+ if (_err == simdjson::NO_SUCH_FIELD || _err ==
simdjson::INDEX_OUT_OF_BOUNDS) { \
+ return Status::NotFound(
\
+ fmt::format("err: {}, msg: {}",
simdjson::error_message(_err), _msg)); \
Review Comment:
what's the meaning of `err: to simdjson err:`?
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
Review Comment:
it will throw expection
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
+ auto error = _ondemand_json_parser
+ ->iterate(std::string_view(reinterpret_cast<const
char*>(
+
_simdjson_ondemand_padding_buffer),
+ *size),
+ _padded_size)
+ .get(_original_json_doc);
+ auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+ const std::string& doc_info) -> Status {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ };
+ if (error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ error, simdjson::error_message(error));
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ try {
+ // set json root
+ // if it is an array at top level, then we should iterate the entire
array in
+ // ::_simdjson_handle_flat_array_complex_json
+ if (_parsed_json_root.size() != 0 &&
+ _original_json_doc.type() ==
simdjson::ondemand::json_type::object) {
Review Comment:
it's handled in _simdjson_handle_flat_array_complex_json
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
Review Comment:
_append_error_msg will append err msg and continue to process next line
##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -1007,4 +1015,560 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
}
return Status::OK();
}
+// ---------SIMDJSON----------
+// simdjson, replace none simdjson function if it is ready
+Status NewJsonReader::_simdjson_init_reader() {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ //improve performance
+ if (_parsed_jsonpaths.empty() || _is_dynamic_schema) { // input is a
simple json-string
+ _vhandle_json_callback = _is_dynamic_schema ?
&NewJsonReader::_vhandle_dynamic_json
+ :
&NewJsonReader::_simdjson_handle_simple_json;
+ } else { // input is a complex json-string and a json-path
+ if (_strip_outer_array) {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_flat_array_complex_json;
+ } else {
+ _vhandle_json_callback =
&NewJsonReader::_simdjson_handle_nested_complex_json;
+ }
+ }
+ if (_is_dynamic_schema) {
+ _json_parser =
std::make_unique<vectorized::JSONDataParser<vectorized::SimdJSONParser>>();
+ }
+ _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ _slot_desc_index.emplace(_file_slot_descs[i]->col_name(), i);
+ }
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_handle_simple_json(std::vector<MutableColumnPtr>&
columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool*
eof) {
+ // simple json
+ simdjson::ondemand::object objectValue;
+ size_t num_rows = columns[0]->size();
+ do {
+ bool valid = false;
+ try {
+ if (_next_row >= _total_rows) { // parse json and generic document
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ if (_json_value.type() ==
simdjson::ondemand::json_type::array) {
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+
+ _total_rows = _array.count_elements();
+ if (_total_rows == 0) {
+ // may be passing an empty json, such as "[]"
+ RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json
line", "", nullptr));
+ if (*_scanner_eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } else {
+ _total_rows = 1; // only one row
+ objectValue = _json_value;
+ }
+ _next_row = 0;
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array) {
// handle case 1
+ objectValue = *_array_iter;
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ } else { // handle case 2
+ // objectValue = _json_value.get_object();
+ RETURN_IF_ERROR(
+ _simdjson_set_column_value(&objectValue, columns,
slot_descs, &valid));
+ }
+ _next_row++;
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for array failed. code:
{}, error info: {}",
+ e.error(), e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (!valid) {
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means
that we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // array complex json
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ do {
+ try {
+ if (_next_row >= _total_rows) {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ if (st == Status::OK()) {
+ return Status::OK();
+ }
+ if (_total_rows == 0) {
+ continue;
+ }
+ }
+ _array = _json_value.get_array();
+ _array_iter = _array.begin();
+ }
+
+ bool valid = true;
+ cur = (*_array_iter).get_object();
+ // extract root
+ if (_parsed_json_root.size() != 0) {
+ simdjson::ondemand::value val;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(cur,
_parsed_json_root, &val));
+ // If val is not an object, will throw simdjson::simdjson_error
+ cur = val.get_object();
+ }
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ // Reached end point
+ if (_array_iter == _array.end()) {
+ // Hint to read next json doc
+ _next_row = _total_rows + 1;
+ break;
+ }
+ ++_array_iter;
+ ++_next_row;
+ if (!valid) {
+ continue; // process next line
+ }
+ *is_empty_row = false;
+ break; // get a valid row, then break
+ } catch (simdjson::simdjson_error& e) {
+ // prevent from endless loop
+ _next_row = _total_rows + 1;
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ } while (_next_row <= _total_rows);
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_handle_nested_complex_json(
+ std::vector<MutableColumnPtr>& columns, const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* is_empty_row, bool* eof) {
+ // nested complex json
+ while (true) {
+ size_t num_rows = columns[0]->size();
+ simdjson::ondemand::object cur;
+ try {
+ Status st = _simdjson_parse_json(is_empty_row, eof);
+ if (st.is<DATA_QUALITY_ERROR>()) {
+ continue; // continue to read next
+ }
+ RETURN_IF_ERROR(st);
+ if (*is_empty_row == true) {
+ return Status::OK();
+ }
+ *is_empty_row = false;
+ bool valid = true;
+ cur = _json_value.get_object();
+ RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur,
slot_descs, columns, &valid));
+ if (!valid) {
+ // there is only one line in this case, so if it return false,
just set is_empty_row true
+ // so that the caller will continue reading next line.
+ *is_empty_row = true;
+ }
+ break; // read a valid row
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data failed. code: {}, error
info: {}", e.error(),
+ e.what());
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return ""; },
+ [&]() -> std::string { return fmt::to_string(error_msg);
}, eof));
+ _counter->num_rows_filtered++;
+ // Before continuing to process other rows, we need to first clean
the fail parsed row.
+ for (int i = 0; i < columns.size(); ++i) {
+ if (columns[i]->size() > num_rows) {
+ columns[i]->pop_back(columns[i]->size() - num_rows);
+ }
+ }
+ if (*_scanner_eof) {
+ // When _scanner_eof is true and valid is false, it means that
we have encountered
+ // unqualified data and decided to stop the scan.
+ *is_empty_row = true;
+ return Status::OK();
+ }
+ continue;
+ }
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object*
value,
+
std::vector<MutableColumnPtr>& columns,
+ const
std::vector<SlotDescriptor*>& slot_descs,
+ bool* valid) {
+ // set
+ size_t cur_row_count = columns[0]->size();
+ bool has_valid_value = false;
+ // iterate through object, simdjson::ondemond will parsing on the fly
+ for (auto field : *value) {
+ std::string_view key = field.unescaped_key();
+ auto iter = _slot_desc_index.find(std::string(key));
+ if (iter == _slot_desc_index.end()) {
+ // This key is not exist in slot desc, just ignore
+ continue;
+ }
+ simdjson::ondemand::value val = field.value();
+ RETURN_IF_ERROR(_simdjson_write_data_to_column(val,
slot_descs[iter->second],
+
columns[iter->second].get(), valid));
+ if (!(*valid)) {
+ return Status::OK();
+ }
+ has_valid_value = true;
+ }
+ if (!has_valid_value) {
+ RETURN_IF_ERROR(
+ _append_error_msg(value, "All fields is null, this is a
invalid row.", "", valid));
+ return Status::OK();
+ }
+
+ // fill missing slot
+ int nullcount = 0;
+ int ctx_idx = 0;
+ for (auto slot_desc : slot_descs) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+ auto* column_ptr = columns[dest_index].get();
+ if (column_ptr->size() < cur_row_count + 1) {
+ DCHECK(column_ptr->size() == cur_row_count);
+ column_ptr->assume_mutable()->insert_default();
+ ++nullcount;
+ }
+ DCHECK(column_ptr->size() == cur_row_count + 1);
+ }
+ // There is at least one valid value here
+ DCHECK(nullcount < columns.size());
+ *valid = true;
+ return Status::OK();
+}
+
+Status
NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
+ SlotDescriptor* slot_desc,
+ vectorized::IColumn*
column, bool* valid) {
+ // write
+ vectorized::ColumnNullable* nullable_column = nullptr;
+ vectorized::IColumn* column_ptr = nullptr;
+ if (slot_desc->is_nullable()) {
+ nullable_column = assert_cast<vectorized::ColumnNullable*>(column);
+ column_ptr = &nullable_column->get_nested_column();
+ }
+
+ // TODO: if the vexpr can support another 'slot_desc type' than
'TYPE_VARCHAR',
+ // we need use a function to support these types to insert data in columns.
+ ColumnString* column_string = assert_cast<ColumnString*>(column_ptr);
+ if (value.is_null()) {
+ if (column->is_nullable()) {
+ // insert_default already push 1 to null_map
+ nullable_column->insert_default();
+ } else {
+ RETURN_IF_ERROR(_append_error_msg(
+ nullptr, "Json value is null, but the column `{}` is not
nullable.",
+ slot_desc->col_name(), valid));
+ return Status::OK();
+ }
+ } else if (value.type() == simdjson::ondemand::json_type::boolean) {
+ nullable_column->get_null_map_data().push_back(0);
+ if (value.get_bool()) {
+ column_string->insert_data("1", 1);
+ } else {
+ column_string->insert_data("0", 1);
+ }
+ } else {
+ // just return it's str representation
+ auto str_view = simdjson::to_json_string(value).value();
+ if (str_view[0] == '\"' || str_view[0] == '\'') {
+ str_view = str_view.substr(1, str_view.length() - 2);
+ }
+ nullable_column->get_null_map_data().push_back(0);
+ column_string->insert_data(str_view.data(), str_view.length());
+ }
+
+ *valid = true;
+ return Status::OK();
+}
+
+Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj,
std::string error_msg,
+ std::string col_name, bool* valid) {
+ std::string err_msg;
+ if (!col_name.empty()) {
+ fmt::memory_buffer error_buf;
+ fmt::format_to(error_buf, error_msg, col_name);
+ err_msg = fmt::to_string(error_buf);
+ } else {
+ err_msg = error_msg;
+ }
+
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ if (!obj) {
+ return "";
+ }
+ std::string_view str_view;
+ (void)!obj->raw_json().get(str_view);
+ return std::string(str_view.data(), str_view.size());
+ },
+ [&]() -> std::string { return err_msg; }, _scanner_eof));
+
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_parse_json(bool* is_empty_row, bool* eof) {
+ size_t size = 0;
+ RETURN_IF_ERROR(_simdjson_parse_json_doc(&size, eof));
+
+ // read all data, then return
+ if (size == 0 || *eof) {
+ *is_empty_row = true;
+ return Status::OK();
+ }
+
+ if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
+ _total_rows = _json_value.count_elements().value();
+ _next_row = 0;
+
+ if (_total_rows == 0) {
+ // meet an empty json array.
+ *is_empty_row = true;
+ }
+ }
+ return Status::OK();
+}
+Status NewJsonReader::_simdjson_parse_json_doc(size_t* size, bool* eof) {
+ // read a whole message
+ SCOPED_TIMER(_file_read_timer);
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
+ } else {
+ size_t length = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &length));
+ json_str = json_str_ptr.get();
+ *size = length;
+ if (length == 0) {
+ *eof = true;
+ }
+ }
+
+ _bytes_read_counter += *size;
+ if (*eof) {
+ return Status::OK();
+ }
+ memcpy(_simdjson_ondemand_padding_buffer, json_str, *size);
+ auto error = _ondemand_json_parser
+ ->iterate(std::string_view(reinterpret_cast<const
char*>(
+
_simdjson_ondemand_padding_buffer),
+ *size),
+ _padded_size)
+ .get(_original_json_doc);
+ auto return_quality_error = [&](fmt::memory_buffer& error_msg,
+ const std::string& doc_info) -> Status {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string { return doc_info; },
+ [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
+ _counter->num_rows_filtered++;
+ if (*_scanner_eof) {
+ // Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
+ // we meet enough invalid rows and the scanner should be stopped.
+ // So we set eof to true and return OK, the caller will stop the
process as we meet the end of file.
+ *eof = true;
+ return Status::OK();
+ }
+ return Status::DataQualityError(fmt::to_string(error_msg));
+ };
+ if (error != simdjson::error_code::SUCCESS) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
+ error, simdjson::error_message(error));
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ try {
+ // set json root
+ // if it is an array at top level, then we should iterate the entire
array in
+ // ::_simdjson_handle_flat_array_complex_json
+ if (_parsed_json_root.size() != 0 &&
+ _original_json_doc.type() ==
simdjson::ondemand::json_type::object) {
+ simdjson::ondemand::object object = _original_json_doc;
+ Status st = JsonFunctions::extract_from_object(object,
_parsed_json_root, &_json_value);
+ if (!st.ok()) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}", st.to_string());
+ return return_quality_error(error_msg,
std::string((char*)json_str, *size));
+ }
+ } else {
+ _json_value = _original_json_doc;
+ }
+ } catch (simdjson::simdjson_error& e) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "Encounter error while extract_from_object,
error: {}", e.what());
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+
+ if (_json_value.type() == simdjson::ondemand::json_type::array &&
!_strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is array-object, `strip_outer_array` must be
TRUE.");
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+
+ if (_json_value.type() != simdjson::ondemand::json_type::array &&
_strip_outer_array) {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg, "{}",
+ "JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
+ return return_quality_error(error_msg, std::string((char*)json_str,
*size));
+ }
+ return Status::OK();
+}
+
+Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
+ simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>&
slot_descs,
+ std::vector<MutableColumnPtr>& columns, bool* valid) {
+ // write by jsonpath
+ size_t column_num = slot_descs.size();
+ bool has_valid_value = false;
+ size_t cur_row_count = columns[0]->size();
+ for (size_t i = 0; i < column_num; i++) {
+ auto* column_ptr = columns[i].get();
+ simdjson::ondemand::value json_value;
+ Status st;
+ if (i < _parsed_jsonpaths.size()) {
+ st = JsonFunctions::extract_from_object(*value,
_parsed_jsonpaths[i], &json_value);
+ if (!st.ok() && !st.is<NOT_FOUND>()) {
+ return st;
+ }
+ }
+ if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
+ // not match in jsondata.
+ if (!slot_descs[i]->is_nullable()) {
+ RETURN_IF_ERROR(_append_error_msg(
+ value, "The column `{}` is not nullable, but it's not
found in jsondata.",
+ slot_descs[i]->col_name(), valid));
+ return Status::OK();
+ }
+ } else {
+ RETURN_IF_ERROR(
+ _simdjson_write_data_to_column(json_value, slot_descs[i],
column_ptr, valid));
+ if (!(*valid)) {
Review Comment:
_append_error_msg will append err msg and continue to process next line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]