This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 98cd0ff5cc9 [fix](load) return DataQualityError when filtered rows
exceeds limit (#47617) (#49287)
98cd0ff5cc9 is described below
commit 98cd0ff5cc967e5e55dbb89744c895b220302dd2
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Mar 25 21:23:58 2025 +0800
[fix](load) return DataQualityError when filtered rows exceeds limit
(#47617) (#49287)
backport #47617
---
.../exec/group_commit_block_sink_operator.cpp | 14 ++--
be/src/runtime/runtime_state.cpp | 8 +-
be/src/runtime/runtime_state.h | 2 +-
be/src/vec/exec/format/csv/csv_reader.cpp | 21 +++--
be/src/vec/exec/format/json/new_json_reader.cpp | 57 ++++++-------
be/src/vec/exec/scan/vfile_scanner.cpp | 12 ++-
be/src/vec/sink/vrow_distribution.cpp | 18 ++---
be/src/vec/sink/vtablet_block_convertor.cpp | 47 +++++------
be/src/vec/sink/vtablet_block_convertor.h | 13 ++-
be/src/vec/sink/vtablet_finder.cpp | 16 ++--
be/src/vec/sink/vtablet_finder.h | 4 +-
.../load_p0/stream_load/test_stream_load.groovy | 2 +-
.../test_stream_load_with_filtered_rows.groovy | 94 ++++++++++++++++++++++
13 files changed, 192 insertions(+), 116 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 9b059831842..a9201f0302f 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -347,24 +347,24 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
local_state._vpartition->find_partition(block.get(), index,
local_state._partitions[index]);
}
- bool stop_processing = false;
for (int row_index = 0; row_index < rows; row_index++) {
if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
local_state._filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
<< block->dump_data(row_index, 1);
- RETURN_IF_ERROR(state->append_error_msg_to_file(
+ local_state._has_filtered_rows = true;
+ state->update_num_rows_load_filtered(1);
+ state->update_num_rows_load_total(-1);
+ // meiyi: we should ignore this error in group commit,
+ // as errors should no longer occur after the first 20,000
rows.
+ static_cast<void>(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple.
tuple=\n{}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
- },
- &stop_processing));
- local_state._has_filtered_rows = true;
- state->update_num_rows_load_filtered(1);
- state->update_num_rows_load_total(-1);
+ }));
}
}
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 80b018a4a19..6c495fa79c7 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -385,8 +385,7 @@ Status RuntimeState::create_error_log_file() {
Status RuntimeState::append_error_msg_to_file(std::function<std::string()>
line,
std::function<std::string()>
error_msg,
- bool* stop_processing, bool
is_summary) {
- *stop_processing = false;
+ bool is_summary) {
if (query_type() != TQueryType::LOAD) {
return Status::OK();
}
@@ -407,7 +406,10 @@ Status
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
if (_num_print_error_rows.fetch_add(1, std::memory_order_relaxed) >
MAX_ERROR_NUM &&
!is_summary) {
if (_load_zero_tolerance) {
- *stop_processing = true;
+ return Status::DataQualityError(
+ "Encountered unqualified data, stop processing. Please
check if the source "
+ "data matches the schema, and consider disabling strict
mode or increasing "
+ "max_filter_ratio.");
}
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index a42f4b5232a..3d239e64911 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -279,7 +279,7 @@ public:
// is_summary is true, means we are going to write the summary line
// If we need to stop the processing, set stop_processing to true
Status append_error_msg_to_file(std::function<std::string()> line,
- std::function<std::string()> error_msg,
bool* stop_processing,
+ std::function<std::string()> error_msg,
bool is_summary = false);
int64_t num_bytes_load_total() { return _num_bytes_load_total.load(); }
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 397095590dd..ac892f9b26e 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -717,6 +717,8 @@ Status CsvReader::_validate_line(const Slice& line, bool*
success) {
if (!_is_load) {
return Status::InternalError<false>("Only support csv data in utf8
codec");
} else {
+ _counter->num_rows_filtered++;
+ *success = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string(line.data,
line.size); },
[&]() -> std::string {
@@ -725,10 +727,7 @@ Status CsvReader::_validate_line(const Slice& line, bool*
success) {
"Unable to display, only support csv
data in utf8 codec",
", please check the data encoding");
return fmt::to_string(error_msg);
- },
- &_line_reader_eof));
- _counter->num_rows_filtered++;
- *success = false;
+ }));
return Status::OK();
}
}
@@ -752,6 +751,8 @@ Status CsvReader::_line_split_to_values(const Slice& line,
bool* success) {
(ignore_col && _split_values.size() < _file_slot_descs.size())) {
std::string cmp_str =
_split_values.size() > _file_slot_descs.size() ? "more
than" : "less than";
+ _counter->num_rows_filtered++;
+ *success = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string(line.data,
line.size); },
[&]() -> std::string {
@@ -775,10 +776,7 @@ Status CsvReader::_line_split_to_values(const Slice& line,
bool* success) {
}
fmt::format_to(error_msg, "result values:[{}]",
fmt::to_string(values));
return fmt::to_string(error_msg);
- },
- &_line_reader_eof));
- _counter->num_rows_filtered++;
- *success = false;
+ }));
return Status::OK();
}
}
@@ -801,6 +799,8 @@ Status CsvReader::_check_array_format(std::vector<Slice>&
split_values, bool* is
}
const Slice& value = split_values[j];
if (slot_desc->type().is_array_type() && !_is_null(value) &&
!_is_array(value)) {
+ _counter->num_rows_filtered++;
+ *is_success = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string(value.data,
value.size); },
[&]() -> std::string {
@@ -808,10 +808,7 @@ Status CsvReader::_check_array_format(std::vector<Slice>&
split_values, bool* is
fmt::format_to(err_msg, "Invalid format for array
column({})",
slot_desc->col_name());
return fmt::to_string(err_msg);
- },
- &_line_reader_eof));
- _counter->num_rows_filtered++;
- *is_success = false;
+ }));
return Status::OK();
}
}
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index ecf17485d4a..623994d0783 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -679,10 +679,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code:
{}, error info: {}",
_origin_json_doc.GetParseError(),
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+ _counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string((char*)json_str,
*size); },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
+ [&]() -> std::string { return fmt::to_string(error_msg); }));
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
@@ -700,10 +700,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
if (_json_doc == nullptr) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", "JSON Root not found.");
+ _counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return
_print_json_value(_origin_json_doc); },
- [&]() -> std::string { return fmt::to_string(error_msg);
}, _scanner_eof));
- _counter->num_rows_filtered++;
+ [&]() -> std::string { return fmt::to_string(error_msg);
}));
if (*_scanner_eof) {
// Same as Case A
*eof = true;
@@ -719,10 +719,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be
TRUE.");
+ _counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return
_print_json_value(_origin_json_doc); },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
+ [&]() -> std::string { return fmt::to_string(error_msg); }));
if (*_scanner_eof) {
// Same as Case A
*eof = true;
@@ -735,10 +735,10 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array`
must be FALSE.");
+ _counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return
_print_json_value(_origin_json_doc); },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
+ [&]() -> std::string { return fmt::to_string(error_msg); }));
if (*_scanner_eof) {
// Same as Case A
*eof = true;
@@ -1092,20 +1092,21 @@ Status NewJsonReader::_append_error_msg(const
rapidjson::Value& objectValue, std
err_msg = error_msg;
}
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return
NewJsonReader::_print_json_value(objectValue); },
- [&]() -> std::string { return err_msg; }, _scanner_eof));
+ [&]() -> std::string { return err_msg; }));
// TODO(ftw): check here?
if (*_scanner_eof) {
_reader_eof = true;
}
- _counter->num_rows_filtered++;
- if (valid != nullptr) {
- // current row is invalid
- *valid = false;
- }
return Status::OK();
}
@@ -1226,11 +1227,6 @@ Status
NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data failed. code: {}, error info:
{}", error.error(),
error.what());
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return std::string(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
- },
- [&]() -> std::string { return fmt::to_string(error_msg); }, eof));
_counter->num_rows_filtered++;
// Before continuing to process other rows, we need to first clean the
fail parsed row.
for (int i = 0; i < block.columns(); ++i) {
@@ -1240,6 +1236,11 @@ Status
NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Bl
}
}
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return std::string(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
+ },
+ [&]() -> std::string { return fmt::to_string(error_msg); }));
return Status::OK();
}
@@ -1808,6 +1809,12 @@ Status
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
err_msg = error_msg;
}
+ _counter->num_rows_filtered++;
+ if (valid != nullptr) {
+ // current row is invalid
+ *valid = false;
+ }
+
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
if (!obj) {
@@ -1817,13 +1824,7 @@ Status
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
(void)!obj->raw_json().get(str_view);
return std::string(str_view.data(), str_view.size());
},
- [&]() -> std::string { return err_msg; }, _scanner_eof));
-
- _counter->num_rows_filtered++;
- if (valid != nullptr) {
- // current row is invalid
- *valid = false;
- }
+ [&]() -> std::string { return err_msg; }));
return Status::OK();
}
@@ -1893,10 +1894,10 @@ Status NewJsonReader::_get_json_value(size_t* size,
bool* eof, simdjson::error_c
SCOPED_TIMER(_file_read_timer);
auto return_quality_error = [&](fmt::memory_buffer& error_msg,
const std::string& doc_info) -> Status {
+ _counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return doc_info; },
- [&]() -> std::string { return fmt::to_string(error_msg); },
_scanner_eof));
- _counter->num_rows_filtered++;
+ [&]() -> std::string { return fmt::to_string(error_msg); }));
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in
"append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 48502209622..5917c3bed76 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -249,7 +249,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state,
Block* block, bool* eo
Status st = _get_block_wrapped(state, block, eof);
if (!st.ok()) {
// add cur path in error msg for easy debugging
- return std::move(st.prepend("cur path: " +
get_current_scan_range_name() + ". "));
+ return std::move(st.append(". cur path: " +
get_current_scan_range_name()));
}
return st;
}
@@ -572,6 +572,7 @@ Status VFileScanner::_convert_to_output_block(Block* block)
{
if (_strict_mode &&
(_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index])
.column->is_null_at(i)) {
+ filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i,
@@ -591,10 +592,9 @@ Status VFileScanner::_convert_to_output_block(Block*
block) {
"src value is {}",
slot_desc->col_name(),
_strict_mode, raw_string);
return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- filter_map[i] = false;
+ }));
} else if (!slot_desc->is_nullable()) {
+ filter_map[i] = false;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block_ptr->dump_one_line(i,
@@ -607,9 +607,7 @@ Status VFileScanner::_convert_to_output_block(Block* block)
{
"nullable",
slot_desc->col_name());
return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- filter_map[i] = false;
+ }));
}
}
}
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index 2de21edd80b..34ef9565dba 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -301,9 +301,8 @@ Status
VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
auto num_rows = block->rows();
- bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip));
+ _tablet_indexes, _skip));
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
_skip[i] = _skip[i] || _block_convertor->filter_map()[i];
@@ -369,11 +368,9 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_partition(
auto partition_col = block->get_by_position(partition_keys[0]);
_missing_map.clear();
_missing_map.reserve(partition_col.column->size());
- bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip,
- &_missing_map));
+ _tablet_indexes, _skip,
&_missing_map));
// the missing vals for auto partition are also skipped.
if (has_filtered_rows) {
@@ -400,7 +397,6 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
// for auto-partition's, find and save origins in _partitions and replace
them. at meanwhile save the missing values for auto
// partition. then we find partition again to get replaced partitions in
_partitions. this time _missing_map is ignored cuz
// we already saved missing values.
- bool stop_processing = false;
if (_vpartition->is_auto_partition() &&
_state->query_options().enable_auto_create_when_overwrite) {
// allow auto create partition for missing rows.
@@ -410,8 +406,7 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
_missing_map.reserve(partition_col.column->size());
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip,
- &_missing_map));
+ _tablet_indexes, _skip,
&_missing_map));
// allow and really need to create during auto-detect-overwriting.
if (!_missing_map.empty()) {
@@ -419,7 +414,7 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip));
+ _tablet_indexes, _skip));
}
RETURN_IF_ERROR(_replace_overwriting_partition());
@@ -429,8 +424,7 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
_state->query_options().enable_auto_create_when_overwrite) {
// here _missing_map is just a placeholder
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip,
- &_missing_map));
+ _tablet_indexes, _skip,
&_missing_map));
if (VLOG_TRACE_IS_ON) {
std::string tmp;
for (auto v : _missing_map) {
@@ -440,7 +434,7 @@ Status
VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows,
_partitions,
- _tablet_indexes,
stop_processing, _skip));
+ _tablet_indexes, _skip));
}
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp
b/be/src/vec/sink/vtablet_block_convertor.cpp
index 820759af2e4..17ceebdcd34 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -92,14 +92,11 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_map.clear();
_filter_map.resize(rows, 0);
- bool stop_processing = false;
- RETURN_IF_ERROR(_validate_data(state, block.get(), rows,
filtered_rows, &stop_processing));
+ auto st = _validate_data(state, block.get(), rows, filtered_rows);
_num_filtered_rows += filtered_rows;
has_filtered_rows = filtered_rows > 0;
- if (stop_processing) {
- // should be returned after updating "_number_filtered_rows", to
make sure that load job can be cancelled
- // because of "data unqualified"
- return Status::DataQualityError("Encountered unqualified data,
stop processing");
+ if (!st.ok()) {
+ return st;
}
_convert_to_dest_desc_block(block.get());
}
@@ -185,9 +182,8 @@ DecimalType
OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
Status OlapTableBlockConvertor::_internal_validate_column(
RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
- vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
- fmt::memory_buffer& error_prefix, const uint32_t row_count,
- vectorized::IColumn::Permutation* rows) {
+ vectorized::ColumnPtr column, size_t slot_index, fmt::memory_buffer&
error_prefix,
+ const uint32_t row_count, vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
@@ -196,8 +192,7 @@ Status OlapTableBlockConvertor::_internal_validate_column(
[&error_prefix,
&error_msg]() -> std::string {
return
fmt::to_string(error_prefix) +
fmt::to_string(error_msg);
- },
- stop_processing);
+ });
error_msg.clear();
return ret;
};
@@ -384,8 +379,8 @@ Status OlapTableBlockConvertor::_internal_validate_column(
}
fmt::format_to(error_prefix, "ARRAY type failed: ");
RETURN_IF_ERROR(_validate_column(state, nested_type,
type.contains_nulls[0],
- column_array->get_data_ptr(),
slot_index, stop_processing,
- error_prefix, permutation.size(),
&permutation));
+ column_array->get_data_ptr(),
slot_index, error_prefix,
+ permutation.size(), &permutation));
break;
}
case TYPE_MAP: {
@@ -402,11 +397,11 @@ Status OlapTableBlockConvertor::_internal_validate_column(
}
fmt::format_to(error_prefix, "MAP type failed: ");
RETURN_IF_ERROR(_validate_column(state, key_type,
type.contains_nulls[0],
- column_map->get_keys_ptr(),
slot_index, stop_processing,
- error_prefix, permutation.size(),
&permutation));
+ column_map->get_keys_ptr(),
slot_index, error_prefix,
+ permutation.size(), &permutation));
RETURN_IF_ERROR(_validate_column(state, val_type,
type.contains_nulls[1],
- column_map->get_values_ptr(),
slot_index, stop_processing,
- error_prefix, permutation.size(),
&permutation));
+ column_map->get_values_ptr(),
slot_index, error_prefix,
+ permutation.size(), &permutation));
break;
}
case TYPE_STRUCT: {
@@ -417,7 +412,7 @@ Status OlapTableBlockConvertor::_internal_validate_column(
for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
RETURN_IF_ERROR(_validate_column(state, type.children[sc],
type.contains_nulls[sc],
column_struct->get_column_ptr(sc), slot_index,
- stop_processing, error_prefix,
+ error_prefix,
column_struct->get_column_ptr(sc)->size()));
}
break;
@@ -452,8 +447,13 @@ Status OlapTableBlockConvertor::_internal_validate_column(
}
Status OlapTableBlockConvertor::_validate_data(RuntimeState* state,
vectorized::Block* block,
- const uint32_t rows, int&
filtered_rows,
- bool* stop_processing) {
+ const uint32_t rows, int&
filtered_rows) {
+ filtered_rows = 0;
+ Defer defer {[&] {
+ for (int i = 0; i < rows; ++i) {
+ filtered_rows += _filter_map[i];
+ }
+ }};
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
block->get_by_position(i).column =
@@ -463,12 +463,7 @@ Status
OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::
fmt::memory_buffer error_prefix;
fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
RETURN_IF_ERROR(_validate_column(state, desc->type(),
desc->is_nullable(), column, i,
- stop_processing, error_prefix, rows));
- }
-
- filtered_rows = 0;
- for (int i = 0; i < rows; ++i) {
- filtered_rows += _filter_map[i];
+ error_prefix, rows));
}
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_block_convertor.h
b/be/src/vec/sink/vtablet_block_convertor.h
index 7f866c38032..fbeec07870f 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -67,27 +67,26 @@ private:
DecimalType _get_decimalv3_min_or_max(const TypeDescriptor& type);
Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
- vectorized::ColumnPtr column, size_t slot_index,
bool* stop_processing,
+ vectorized::ColumnPtr column, size_t slot_index,
fmt::memory_buffer& error_prefix, const uint32_t
row_count,
vectorized::IColumn::Permutation* rows = nullptr) {
RETURN_IF_CATCH_EXCEPTION({
return _internal_validate_column(state, type, is_nullable, column,
slot_index,
- stop_processing, error_prefix,
row_count, rows);
+ error_prefix, row_count, rows);
});
}
Status _internal_validate_column(RuntimeState* state, const
TypeDescriptor& type,
bool is_nullable, vectorized::ColumnPtr
column,
- size_t slot_index, bool* stop_processing,
- fmt::memory_buffer& error_prefix, const
uint32_t row_count,
+ size_t slot_index, fmt::memory_buffer&
error_prefix,
+ const uint32_t row_count,
vectorized::IColumn::Permutation* rows =
nullptr);
// make input data valid for OLAP table
// return number of invalid/filtered rows.
// invalid row number is set in Bitmap
- // set stop_processing if we want to stop the whole process now.
Status _validate_data(RuntimeState* state, vectorized::Block* block, const
uint32_t rows,
- int& filtered_rows, bool* stop_processing);
+ int& filtered_rows);
// some output column of output expr may have different nullable property
with dest slot desc
// so here need to do the convert operation
@@ -123,4 +122,4 @@ private:
bool _is_partial_update_and_auto_inc = false;
};
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vtablet_finder.cpp
b/be/src/vec/sink/vtablet_finder.cpp
index 3bfd5bb4d22..a605a005b5f 100644
--- a/be/src/vec/sink/vtablet_finder.cpp
+++ b/be/src/vec/sink/vtablet_finder.cpp
@@ -34,8 +34,8 @@
namespace doris::vectorized {
Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int
rows,
std::vector<VOlapTablePartition*>&
partitions,
- std::vector<uint32_t>& tablet_index,
bool& stop_processing,
- std::vector<bool>& skip,
std::vector<int64_t>* miss_rows) {
+ std::vector<uint32_t>& tablet_index,
std::vector<bool>& skip,
+ std::vector<int64_t>* miss_rows) {
for (int index = 0; index < rows; index++) {
_vpartition->find_partition(block, index, partitions[index]);
}
@@ -50,6 +50,9 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state,
Block* block, int row
skip[row_index] = true;
continue;
}
+ _num_filtered_rows++;
+ _filter_bitmap.Set(row_index, true);
+ skip[row_index] = true;
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
@@ -57,14 +60,7 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state,
Block* block, int row
fmt::format_to(buf, "no partition for this tuple.
tuple=\n{}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
- },
- &stop_processing));
- _num_filtered_rows++;
- _filter_bitmap.Set(row_index, true);
- if (stop_processing) {
- return Status::DataQualityError("Encountered unqualified data,
stop processing");
- }
- skip[row_index] = true;
+ }));
continue;
}
if (!partitions[row_index]->is_mutable) [[unlikely]] {
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index 24f8e357e28..52c2b3db7e5 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -44,8 +44,8 @@ public:
Status find_tablets(RuntimeState* state, vectorized::Block* block, int
rows,
std::vector<VOlapTablePartition*>& partitions,
- std::vector<uint32_t>& tablet_index, bool& filtered,
- std::vector<bool>& skip, std::vector<int64_t>*
miss_rows = nullptr);
+ std::vector<uint32_t>& tablet_index,
std::vector<bool>& skip,
+ std::vector<int64_t>* miss_rows = nullptr);
bool is_find_tablet_every_sink() {
return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 1bbf6033ef8..a9cd807fe44 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -1615,7 +1615,7 @@ suite("test_stream_load", "p0") {
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(result.contains("ErrorURL"))
- assertTrue(json.Message.contains("Encountered unqualified
data, stop processing"))
+ assertTrue(json.Message.contains("Encountered unqualified
data, stop processing. Please"))
}
}
} finally {
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
new file mode 100644
index 00000000000..1801d2be52d
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_with_filtered_rows.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_with_filtered_rows", "p2") {
+ sql "show tables"
+
+ // test length of input is too long than schema.
+ def tableName = "test_large_file_with_many_filtered_rows"
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE `${tableName}` (
+ `k1` int NULL,
+ `k2` tinyint NULL,
+ `k3` smallint NULL,
+ `k4` bigint NULL,
+ `k5` largeint NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9,0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`, `k3`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS AUTO
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "false"
+ );
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ file
"""${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.csv"""
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
+ assertTrue(json.Message.contains("Encountered unqualified
data, stop processing. Please"))
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ file
"""${getS3Url()}/regression/load_p2/stream_load/test_stream_load_with_dbgen_progress.json"""
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
+ assertTrue(json.Message.contains("Encountered unqualified
data, stop processing. Please"))
+ }
+ }
+
+ } finally {
+ //sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+ }
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]