This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ffd6cf35c6 [Improve](topn opt) avoid crash when rpc returned row
contains duplicated row entry (#29872)
0ffd6cf35c6 is described below
commit 0ffd6cf35c693bc1a1a5af708c7348135aba184e
Author: lihangyu <[email protected]>
AuthorDate: Mon Jan 15 14:32:14 2024 +0800
[Improve](topn opt) avoid crash when rpc returned row contains duplicated
row entry (#29872)
1. Add more info to trace potential bug and avoid crash
2. use correct permutation size to do `column->permute`
---
be/src/exec/rowid_fetcher.cpp | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index c7519c5b05a..d527a3ac055 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -39,6 +39,7 @@
#include "bthread/countdown_event.h"
#include "common/config.h"
#include "common/consts.h"
+#include "common/exception.h"
#include "exec/tablet_info.h" // DorisNodesInfo
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
@@ -230,7 +231,10 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr&
column_row_ids,
std::vector<PRowLocation> rows_locs;
rows_locs.reserve(rows_locs.size());
RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block,
&rows_locs));
-
+ if (rows_locs.size() != res_block->rows()) {
+ return Status::InternalError("Miss matched return row loc count {},
expected {}, input {}",
+ rows_locs.size(), res_block->rows(),
column_row_ids->size());
+ }
// Final sort by row_ids sequence, since row_ids is already sorted if need
std::map<GlobalRowLoacation, size_t> positions;
for (size_t i = 0; i < rows_locs.size(); ++i) {
@@ -240,6 +244,10 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr&
column_row_ids,
rows_locs[i].ordinal_id());
positions[grl] = i;
};
+ // TODO remove this warning code
+ if (positions.size() < rows_locs.size()) {
+ LOG(WARNING) << "contains duplicated row entry";
+ }
vectorized::IColumn::Permutation permutation;
permutation.reserve(column_row_ids->size());
for (size_t i = 0; i < column_row_ids->size(); ++i) {
@@ -247,10 +255,11 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr&
column_row_ids,
reinterpret_cast<const
GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
permutation.push_back(positions[*location]);
}
- size_t num_rows = res_block->rows();
+ // Check row consistency
+ RETURN_IF_CATCH_EXCEPTION(res_block->check_number_of_rows());
for (size_t i = 0; i < res_block->columns(); ++i) {
res_block->get_by_position(i).column =
- res_block->get_by_position(i).column->permute(permutation,
num_rows);
+ res_block->get_by_position(i).column->permute(permutation,
permutation.size());
}
// shrink for char type
std::vector<size_t> char_type_idx;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]