This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e7c4175 [fix] fix hash table insert() may be failed but not handle
this error (#8207)
e7c4175 is described below
commit e7c417505c583fc5f337f19d0df63f0dc59fb804
Author: Zhengguo Yang <[email protected]>
AuthorDate: Thu Mar 3 22:33:05 2022 +0800
[fix] fix hash table insert() may be failed but not handle this error
(#8207)
---
be/src/exec/except_node.cpp | 12 ++------
be/src/exec/hash_join_node.cpp | 8 +++---
be/src/exec/hash_table.h | 32 +++++++++++++++++-----
be/src/exec/hash_table.hpp | 4 ++-
be/src/exec/intersect_node.cpp | 12 ++------
be/src/exec/set_operation_node.cpp | 12 ++++----
be/src/exec/set_operation_node.h | 25 +++++++----------
be/test/exec/es_predicate_test.cpp | 1 -
.../rowset/segment_v2/binary_dict_page_test.cpp | 2 +-
9 files changed, 56 insertions(+), 52 deletions(-)
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8229b73..d79aceb 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -50,7 +50,9 @@ Status ExceptNode::open(RuntimeState* state) {
for (int i = 1; i < _children.size(); ++i) {
// rebuild hash table, for first time will rebuild with the no
duplicated _hash_tbl,
- if (i > 1) { refresh_hash_table<false>(i); }
+ if (i > 1) {
+ RETURN_IF_ERROR(refresh_hash_table<false>(i));
+ }
// probe
_probe_batch.reset(
@@ -63,17 +65,12 @@ Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(),
&eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Except , while probing the hash
table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
- VLOG_ROW << "probe row: "
- << get_row_output_string(_probe_batch->get_row(j),
child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
if (!_hash_tbl_iterator.matched()) {
_hash_tbl_iterator.set_matched();
_valid_element_in_hash_tbl--;
}
- VLOG_ROW << "probe matched: "
- <<
get_row_output_string(_hash_tbl_iterator.get_row(),
- child(0)->row_desc());
}
}
_probe_batch->reset();
@@ -101,9 +98,6 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch*
out_batch, bool* eos)
out_batch->resize_and_allocate_tuple_buffer(state,
&tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (_hash_tbl_iterator.has_next()) {
- VLOG_ROW << "find row: "
- << get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc())
- << " matched: " << _hash_tbl_iterator.matched();
if (!_hash_tbl_iterator.matched()) {
create_output_row(_hash_tbl_iterator.get_row(), out_batch,
tuple_buf);
tuple_buf += _tuple_desc->byte_size();
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index f7e8bbc..291edbc 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -144,7 +144,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
(std::find(_is_null_safe_eq_join.begin(),
_is_null_safe_eq_join.end(), true) !=
_is_null_safe_eq_join.end());
_hash_tbl.reset(new HashTable(_build_expr_ctxs, _probe_expr_ctxs,
_build_tuple_size,
- stores_nulls, _is_null_safe_eq_join, id(),
mem_tracker(), 1024));
+ stores_nulls, _is_null_safe_eq_join, id(),
mem_tracker(),
+ state->batch_size() * 2));
_probe_batch.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(),
mem_tracker().get()));
@@ -762,7 +763,6 @@ Status HashJoinNode::process_build_batch(RuntimeState*
state, RowBatch* build_ba
// insert build row into our hash table
if (_build_unique) {
for (int i = 0; i < build_batch->num_rows(); ++i) {
- // _hash_tbl->insert_unique(build_batch->get_row(i));
TupleRow* tuple_row = nullptr;
if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) {
build_batch->get_row(i)->deep_copy(tuple_row,
@@ -775,9 +775,9 @@ Status HashJoinNode::process_build_batch(RuntimeState*
state, RowBatch* build_ba
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch->tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the
hash table.");
-
+
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch->num_rows()));
for (int i = 0; i < build_batch->num_rows(); ++i) {
- _hash_tbl->insert(build_batch->get_row(i));
+ _hash_tbl->insert_without_check(build_batch->get_row(i));
}
}
return Status::OK();
diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h
index 973be73..304d936 100644
--- a/be/src/exec/hash_table.h
+++ b/be/src/exec/hash_table.h
@@ -35,8 +35,6 @@ class TupleRow;
class MemTracker;
class RuntimeState;
-using std::vector;
-
// Hash table implementation designed for hash aggregation and hash joins.
This is not
// templatized and is tailored to the usage pattern for aggregation and joins.
The
// hash table store TupleRows and allows for different exprs for insertions
and finds.
@@ -101,20 +99,40 @@ public:
// Insert row into the hash table. Row will be evaluated over
_build_expr_ctxs
// This will grow the hash table if necessary
- void insert(TupleRow* row) {
+ Status insert(TupleRow* row) {
if (_num_filled_buckets > _num_buckets_till_resize) {
- // TODO: next prime instead of double?
- resize_buckets(_num_buckets * 2);
+ RETURN_IF_ERROR(resize_buckets(_num_buckets * 2));
}
insert_impl(row);
+ return Status::OK();
}
+ void insert_without_check(TupleRow* row) { insert_impl(row); }
+
// Insert row into the hash table. if the row is already exist will not
insert
- void insert_unique(TupleRow* row) {
+ Status insert_unique(TupleRow* row) {
+ if (find(row, false) == end()) {
+ return insert(row);
+ }
+ return Status::OK();
+ }
+
+ void insert_unique_without_check(TupleRow* row) {
if (find(row, false) == end()) {
- insert(row);
+ insert_without_check(row);
+ }
+ }
+
+ Status resize_buckets_ahead(int64_t estimate_buckets) {
+ if (_num_filled_buckets + estimate_buckets > _num_buckets_till_resize)
{
+ int64_t new_bucket_size = _num_buckets * 2;
+ while (new_bucket_size <= _num_filled_buckets + estimate_buckets) {
+ new_bucket_size = new_bucket_size * 2;
+ }
+ return resize_buckets(new_bucket_size);
}
+ return Status::OK();
}
bool emplace_key(TupleRow* row, TupleRow** key_addr);
diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp
index 4b679c7..b9deac5 100644
--- a/be/src/exec/hash_table.hpp
+++ b/be/src/exec/hash_table.hpp
@@ -24,7 +24,9 @@ namespace doris {
inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) {
if (_num_filled_buckets > _num_buckets_till_resize) {
- resize_buckets(_num_buckets * 2);
+ if (!resize_buckets(_num_buckets * 2).ok()) {
+ return false;
+ }
}
if (_current_used == _current_capacity) {
grow_node_array();
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 60481cc..2cbf4db 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -52,7 +52,9 @@ Status IntersectNode::open(RuntimeState* state) {
bool eos = false;
for (int i = 1; i < _children.size(); ++i) {
- if (i > 1) { refresh_hash_table<true>(i); }
+ if (i > 1) {
+ RETURN_IF_ERROR(refresh_hash_table<true>(i));
+ }
_valid_element_in_hash_tbl = 0;
// probe
@@ -66,17 +68,12 @@ Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(child(i)->get_next(state, _probe_batch.get(),
&eos));
RETURN_IF_LIMIT_EXCEEDED(state, " Intersect , while probing the
hash table.");
for (int j = 0; j < _probe_batch->num_rows(); ++j) {
- VLOG_ROW << "probe row: "
- << get_row_output_string(_probe_batch->get_row(j),
child(i)->row_desc());
_hash_tbl_iterator = _hash_tbl->find(_probe_batch->get_row(j));
if (_hash_tbl_iterator != _hash_tbl->end()) {
if (!_hash_tbl_iterator.matched()) {
_valid_element_in_hash_tbl++;
_hash_tbl_iterator.set_matched();
}
- VLOG_ROW << "probe matched: "
- <<
get_row_output_string(_hash_tbl_iterator.get_row(),
- child(0)->row_desc());
}
}
_probe_batch->reset();
@@ -100,9 +97,6 @@ Status IntersectNode::get_next(RuntimeState* state,
RowBatch* out_batch, bool* e
out_batch->resize_and_allocate_tuple_buffer(state,
&tuple_buf_size, &tuple_buf));
memset(tuple_buf, 0, tuple_buf_size);
while (_hash_tbl_iterator.has_next()) {
- VLOG_ROW << "find row: "
- << get_row_output_string(_hash_tbl_iterator.get_row(),
child(0)->row_desc())
- << " matched: " << _hash_tbl_iterator.matched();
if (_hash_tbl_iterator.matched()) {
create_output_row(_hash_tbl_iterator.get_row(), out_batch,
tuple_buf);
tuple_buf += _tuple_desc->byte_size();
diff --git a/be/src/exec/set_operation_node.cpp
b/be/src/exec/set_operation_node.cpp
index 0ca6dd5..5958c25 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -27,7 +27,10 @@
namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, int tuple_id)
- : ExecNode(pool, tnode, descs), _tuple_id(tuple_id),
_tuple_desc(nullptr), _valid_element_in_hash_tbl(0) {}
+ : ExecNode(pool, tnode, descs),
+ _tuple_id(tuple_id),
+ _tuple_desc(nullptr),
+ _valid_element_in_hash_tbl(0) {}
Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
@@ -142,7 +145,7 @@ Status SetOperationNode::open(RuntimeState* state) {
}
// initial build hash table used for remove duplicated
_hash_tbl.reset(new HashTable(_child_expr_lists[0], _child_expr_lists[1],
_build_tuple_size,
- true, _find_nulls, id(), mem_tracker(),
1024));
+ true, _find_nulls, id(), mem_tracker(),
state->batch_size() * 2));
RowBatch build_batch(child(0)->row_desc(), state->batch_size(),
mem_tracker().get());
RETURN_IF_ERROR(child(0)->open(state));
@@ -155,10 +158,9 @@ Status SetOperationNode::open(RuntimeState* state) {
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
RETURN_IF_LIMIT_EXCEEDED(state, " SetOperation, while constructing the
hash table.");
// build hash table and remove duplicate items
+
RETURN_IF_ERROR(_hash_tbl->resize_buckets_ahead(build_batch.num_rows()));
for (int i = 0; i < build_batch.num_rows(); ++i) {
- VLOG_ROW << "build row: "
- << get_row_output_string(build_batch.get_row(i),
child(0)->row_desc());
- _hash_tbl->insert_unique(build_batch.get_row(i));
+ _hash_tbl->insert_unique_without_check(build_batch.get_row(i));
}
VLOG_ROW << "hash table content: " << _hash_tbl->debug_string(true,
&child(0)->row_desc());
build_batch.reset();
diff --git a/be/src/exec/set_operation_node.h b/be/src/exec/set_operation_node.h
index 40645b1..9fbb5d7 100644
--- a/be/src/exec/set_operation_node.h
+++ b/be/src/exec/set_operation_node.h
@@ -54,7 +54,7 @@ protected:
// TODO: Check whether the hash table should be shrink to reduce necessary
refresh
// but may different child has different probe expr which may cause wrong
result.
// so we need keep probe expr same in FE to optimize this issue.
- void refresh_hash_table(int child);
+ Status refresh_hash_table(int child);
/// Tuple id resolved in Prepare() to set tuple_desc_;
const int _tuple_id;
@@ -81,33 +81,28 @@ protected:
};
template <bool keep_matched>
-void SetOperationNode::refresh_hash_table(int child_id) {
+Status SetOperationNode::refresh_hash_table(int child_id) {
SCOPED_TIMER(_build_timer);
- std::unique_ptr<HashTable> temp_tbl(
- new HashTable(_child_expr_lists[0], _child_expr_lists[child_id],
_build_tuple_size,
- true, _find_nulls, id(), mem_tracker(),
- _valid_element_in_hash_tbl /
HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
+ std::unique_ptr<HashTable> temp_tbl(new HashTable(
+ _child_expr_lists[0], _child_expr_lists[child_id],
_build_tuple_size, true, _find_nulls,
+ id(), mem_tracker(),
+ _valid_element_in_hash_tbl /
HashTable::MAX_BUCKET_OCCUPANCY_FRACTION + 1));
_hash_tbl_iterator = _hash_tbl->begin();
while (_hash_tbl_iterator.has_next()) {
if constexpr (keep_matched) {
if (_hash_tbl_iterator.matched()) {
- VLOG_ROW << "rebuild row: "
- << get_row_output_string(_hash_tbl_iterator.get_row(),
- child(0)->row_desc());
- temp_tbl->insert(_hash_tbl_iterator.get_row());
+
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
}
} else {
if (!_hash_tbl_iterator.matched()) {
- VLOG_ROW << "rebuild row: "
- << get_row_output_string(_hash_tbl_iterator.get_row(),
- child(0)->row_desc());
- temp_tbl->insert(_hash_tbl_iterator.get_row());
+
RETURN_IF_ERROR(temp_tbl->insert(_hash_tbl_iterator.get_row()));
}
}
_hash_tbl_iterator.next<false>();
}
_hash_tbl.swap(temp_tbl);
temp_tbl->close();
+ return Status::OK();
}
-}; // namespace doris
\ No newline at end of file
+}; // namespace doris
diff --git a/be/test/exec/es_predicate_test.cpp
b/be/test/exec/es_predicate_test.cpp
index 1e382ba..28ce548 100644
--- a/be/test/exec/es_predicate_test.cpp
+++ b/be/test/exec/es_predicate_test.cpp
@@ -143,7 +143,6 @@ TEST_F(EsPredicateTest, normal) {
std::vector<ExprContext*> conjunct_ctxs;
Status status = build_expr_context_list(conjunct_ctxs);
ASSERT_TRUE(status.ok());
-
TupleDescriptor* tuple_desc = _desc_tbl->get_tuple_descriptor(0);
std::vector<EsPredicate*> predicates;
for (int i = 0; i < conjunct_ctxs.size(); ++i) {
diff --git a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
index b2fb740..1bf2f3d 100644
--- a/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
+++ b/be/test/olap/rowset/segment_v2/binary_dict_page_test.cpp
@@ -30,8 +30,8 @@
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
-#include "util/debug_util.h"
#include "test_util/test_util.h"
+#include "util/debug_util.h"
namespace doris {
namespace segment_v2 {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]