This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit bb20da4b3c3695fece584fdea286e47c7496d469 Author: HappenLee <[email protected]> AuthorDate: Thu Mar 31 23:00:07 2022 +0800 [improvement](join) Support join project in query engine (#8722) --- be/src/vec/columns/column_string.cpp | 7 +- be/src/vec/exec/join/vhash_join_node.cpp | 151 ++++++++++++++++++++----------- be/src/vec/exec/join/vhash_join_node.h | 4 + gensrc/thrift/PlanNodes.thrift | 6 ++ 4 files changed, 112 insertions(+), 56 deletions(-) diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 832bbbe..98ebf67 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -331,7 +331,12 @@ void ColumnString::reserve(size_t n) { } void ColumnString::resize(size_t n) { - offsets.resize(n); + auto origin_size = size(); + if (origin_size > n) { + offsets.resize(n); + } else if (origin_size < n) { + insert_many_defaults(n - origin_size); + } } void ColumnString::get_extremes(Field& min, Field& max) const { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 628ea91..9335028 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -172,7 +172,9 @@ struct ProcessHashTableProbe { _probe_side_output_timer(join_node->_probe_side_output_timer) {} // output build side result column - void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, int size) { + template <bool have_other_join_conjunct = false> + void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, + const std::vector<bool>& output_slot_flags, int size) { constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || @@ -181,28 +183,37 @@ struct ProcessHashTableProbe { constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; - if constexpr (!is_semi_anti_join) { + if constexpr (!is_semi_anti_join || have_other_join_conjunct) { if (_build_blocks.size() == 1) { for (int i = 0; i < column_length; i++) { auto& column = *_build_blocks[0].get_by_position(i).column; - mcol[i + column_offset]->insert_indices_from(column, - _build_block_rows.data(), _build_block_rows.data() + size); + if (output_slot_flags[i]) { + mcol[i + column_offset]->insert_indices_from(column, _build_block_rows.data(), + _build_block_rows.data() + size); + } else { + mcol[i + column_offset]->resize(size); + } } } else { for (int i = 0; i < column_length; i++) { - for (int j = 0; j < size; j++) { - if constexpr (probe_all) { - if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + column_offset]->is_nullable()); - assert_cast<ColumnNullable *>(mcol[i + column_offset].get())->insert_join_null_data(); + if (output_slot_flags[i]) { + for (int j = 0; j < size; j++) { + if constexpr (probe_all) { + if (_build_block_offsets[j] == -1) { + DCHECK(mcol[i + column_offset]->is_nullable()); + assert_cast<ColumnNullable *>( + mcol[i + column_offset].get())->insert_join_null_data(); + } else { + auto &column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + } } else { - auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + auto &column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); } - } else { - auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); } + } else { + mcol[i + column_offset]->resize(size); } } } @@ -210,10 +221,14 @@ struct ProcessHashTableProbe { } // output probe side result column - void probe_side_output_column(MutableColumns& mcol, int column_length, int size) { - for (int i = 0; i < column_length; ++i) { - auto& column = _probe_block.get_by_position(i).column; - column->replicate(&_items_counts[0], size, *mcol[i]); + void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size) { + for (int i = 0; i < output_slot_flags.size(); ++i) { + if (output_slot_flags[i]) { + auto& column = _probe_block.get_by_position(i).column; + column->replicate(&_items_counts[0], size, *mcol[i]); + } else { + mcol[i]->resize(size); + } } } // Only process the join with no other join conjunt, because of no other join conjunt @@ -325,12 +340,13 @@ struct ProcessHashTableProbe { { SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, current_offset); + build_side_output_column(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); } - { + if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN) { SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, right_col_idx, current_offset); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset); } output_block->swap(mutable_block.to_block()); @@ -351,10 +367,7 @@ struct ProcessHashTableProbe { int right_col_idx = _join_node->_left_table_data_types.size(); int right_col_len = _join_node->_right_table_data_types.size(); - IColumn::Offsets offset_data; auto& mcol = mutable_block.mutable_columns(); - offset_data.assign(_probe_rows, (uint32_t)0); - // use in right join to change visited state after // exec the vother join conjunt std::vector<bool*> visited_map; @@ -363,16 +376,23 @@ struct ProcessHashTableProbe { std::vector<bool> same_to_prev; same_to_prev.reserve(1.2 * _batch_size); + _items_counts.resize(_probe_rows); + _build_block_offsets.resize(_batch_size); + _build_block_rows.resize(_batch_size); + memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows); + int current_offset = 0; for (; _probe_index < _probe_rows;) { // ignore null rows if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { - offset_data[_probe_index++] = current_offset; + _items_counts[_probe_index++] = (uint32_t)0; continue; } } + + auto last_offset = current_offset; auto find_result = (*null_map)[_probe_index] ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, @@ -382,15 +402,25 @@ struct ProcessHashTableProbe { if (find_result.is_found()) { auto& mapped = find_result.get_mapped(); auto origin_offset = current_offset; - - for (auto it = mapped.begin(); it.ok(); ++it) { + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; ++current_offset; - const Block& cur_blk = _build_blocks[it->block_offset]; - for (size_t j = 0; j < right_col_len; ++j) { - auto& column = *cur_blk.get_by_position(j).column; - mcol[j + right_col_idx]->insert_from(column, it->row_num); + visited_map.emplace_back(&mapped.visited); + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if (current_offset < _batch_size) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + visited_map.emplace_back(&it->visited); } - visited_map.emplace_back(&it->visited); } same_to_prev.emplace_back(false); for (int i = 0; i < current_offset - origin_offset - 1; ++i) { @@ -399,43 +429,36 @@ struct ProcessHashTableProbe { } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - ++current_offset; same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); // only full outer / left outer need insert the data of right table - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { - for (size_t j = 0; j < right_col_len; ++j) { - DCHECK(mcol[j + right_col_idx]->is_nullable()); - assert_cast<ColumnNullable *>(mcol[j + right_col_idx].get())->insert_join_null_data(); - } - } else { - for (size_t j = 0; j < right_col_len; ++j) { - mcol[j + right_col_idx]->insert_default(); - } - } + // left anti use -1 use a default value + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + ++current_offset; } else { // other join, no nothing } - offset_data[_probe_index++] = current_offset; - + _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); if (current_offset >= _batch_size) { break; } } - for (int i = _probe_index; i < _probe_rows; ++i) { - offset_data[i] = current_offset; + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column<true>(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); } - - output_block->swap(mutable_block.to_block()); - for (int i = 0; i < right_col_idx; ++i) { - auto& column = _probe_block.get_by_position(i).column; - output_block->get_by_position(i).column = column->replicate(offset_data); + { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset); } + output_block->swap(mutable_block.to_block()); - if (_join_node->_vother_join_conjunct_ptr) { + // dispose the other join conjunt exec + { int result_column_id = -1; int orig_columns = output_block->columns(); (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); @@ -613,7 +636,6 @@ private: ProfileCounter* _probe_side_output_timer; }; -// now we only support inner join HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _join_op(tnode.hash_join_node.join_op), @@ -627,7 +649,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN), _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN), - _is_outer_join(_match_all_build || _match_all_probe) { + _is_outer_join(_match_all_build || _match_all_probe), + _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : + std::vector<SlotId>{}) { _runtime_filter_descs = tnode.runtime_filters; init_join_op(); @@ -705,6 +729,23 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { filter_desc, state->query_options())); } + // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need + // insert to output block of hash join. + // _left_output_slots_flags : column of left table need to output set flag = true + // _rgiht_output_slots_flags : column of right table need to output set flag = true + // if _hash_output_slot_ids is empty, means all column of left/right table need to output. + auto init_output_slots_flags = [this](auto& tuple_descs, auto& output_slot_flags) { + for (const auto& tuple_desc : tuple_descs) { + for (const auto& slot_desc : tuple_desc->slots()) { + output_slot_flags.emplace_back(_hash_output_slot_ids.empty() || + std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), + slot_desc->id()) != _hash_output_slot_ids.end()); + } + } + }; + init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), _left_output_slot_flags); + init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), _right_output_slot_flags); + return Status::OK(); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ca93aaa..e2762e6 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -230,6 +230,10 @@ private: std::vector<uint32_t> _items_counts; std::vector<int8_t> _build_block_offsets; std::vector<int> _build_block_rows; + + std::vector<SlotId> _hash_output_slot_ids; + std::vector<bool> _left_output_slot_flags; + std::vector<bool> _right_output_slot_flags; private: Status _hash_table_build(RuntimeState* state); Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0fb74a5..ef599e4 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -394,6 +394,9 @@ struct THashJoinNode { // anything from the ON or USING clauses (but *not* the WHERE clause) that's not an // equi-join predicate, only use in vec exec engine 5: optional Exprs.TExpr vother_join_conjunct + + // hash output column + 6: optional list<Types.TSlotId> hash_output_slot_ids } struct TMergeJoinNode { @@ -789,6 +792,9 @@ struct TPlanNode { 40: optional Exprs.TExpr vconjunct 41: optional TTableFunctionNode table_function_node + + // output column + 42: optional list<Types.TSlotId> output_slot_ids } // A flattened representation of a tree of PlanNodes, obtained by depth-first --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
