This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7a1d112f3ba3dd3f8138779e7807e69bc7f964be Author: HappenLee <[email protected]> AuthorDate: Tue Oct 24 14:04:33 2023 +0800 support right anti/semi/outer join (#25785) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 47 ++-- .../aggregate_function_collect.h | 1 - be/src/vec/columns/column_string.cpp | 4 +- be/src/vec/columns/column_vector.cpp | 2 +- be/src/vec/common/hash_table/hash_map.h | 116 +++++++--- be/src/vec/common/hash_table/hash_map_context.h | 27 ++- .../vec/exec/join/process_hash_table_probe_impl.h | 241 +++++++++++---------- be/src/vec/exec/join/vhash_join_node.cpp | 46 ++-- be/src/vec/exec/join/vhash_join_node.h | 16 +- 9 files changed, 287 insertions(+), 213 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 99178064a27..f18cab51c02 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -194,29 +194,30 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, Status st = _dependency->extract_join_column<true>(block, null_map_val, raw_ptrs, res_col_ids); st = std::visit( - Overload { - [&](std::monostate& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - return Status::OK(); - }, - [&](auto&& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) -> Status { - using HashTableCtxType = std::decay_t<decltype(arg)>; - vectorized::ProcessHashTableBuild<HashTableCtxType, - HashJoinBuildSinkLocalState> - hash_table_build_process(rows, block, raw_ptrs, this, - state->batch_size(), state); - return hash_table_build_process - .template run<has_null_value, short_circuit_for_null_in_build_side>( - arg, - has_null_value || short_circuit_for_null_in_build_side - ? &null_map_val->get_data() - : nullptr, - &_shared_state->_has_null_in_build_side); - }}, - *_shared_state->hash_table_variants, + Overload {[&](std::monostate& arg, auto join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + return Status::OK(); + }, + [&](auto&& arg, auto&& join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side) -> Status { + using HashTableCtxType = std::decay_t<decltype(arg)>; + using JoinOpType = std::decay_t<decltype(join_op)>; + vectorized::ProcessHashTableBuild<HashTableCtxType, + HashJoinBuildSinkLocalState> + hash_table_build_process(rows, block, raw_ptrs, this, + state->batch_size(), state); + return hash_table_build_process + .template run<JoinOpType::value, has_null_value, + short_circuit_for_null_in_build_side>( + arg, + has_null_value || short_circuit_for_null_in_build_side + ? &null_map_val->get_data() + : nullptr, + &_shared_state->_has_null_in_build_side); + }}, + *_shared_state->hash_table_variants, _shared_state->join_op_variants, vectorized::make_bool_variant(_build_side_ignore_null), vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side)); diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h b/be/src/vec/aggregate_functions/aggregate_function_collect.h index 3a4b0bad20f..734f846a2f5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_collect.h +++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h @@ -134,7 +134,6 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> { if (max_size == -1) { max_size = rhs.max_size; } - max_size = rhs.max_size; for (auto& rhs_elem : rhs.data_set) { if constexpr (HasLimit::value) { diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 7d7597e7dc3..2f76e65278b 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -137,7 +137,7 @@ void ColumnString::insert_indices_from(const IColumn& src, const uint32_t* __res auto* dst_offsets_data = offsets.data(); for (auto* __restrict x = indices_begin; x != indices_end; ++x) { - const auto offset = *x; + int64_t offset = *x; total_chars_size += src_offset_data[offset] - src_offset_data[offset - 1]; dst_offsets_data[dst_offsets_pos++] = total_chars_size; } @@ -150,7 +150,7 @@ void ColumnString::insert_indices_from(const IColumn& src, const uint32_t* __res size_t dst_chars_pos = old_char_size; for (auto* __restrict x = indices_begin; x != indices_end; ++x) { - const auto offset = *x; + const int64_t offset = *x; const auto start = src_offset_data[offset - 1]; const auto end = src_offset_data[offset]; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 1f053c4ce4f..c4ca97df7a2 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -379,7 +379,7 @@ void ColumnVector<T>::insert_indices_from(const IColumn& src, // nullmap : indices_begin[i] == 0 means is null at the here, set true here for (int i = 0; i < new_size; ++i) { data[origin_size + i] = - (indices_begin[i] == 0) + (indices_begin[i] == 0) * src_data[indices_begin[i]]; + (indices_begin[i] == 0) + (indices_begin[i] != 0) * src_data[indices_begin[i]]; } } else { // real data : indices_begin[i] == 0 what at is meaningless diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 89dfe7f8aac..ab094d69a67 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -216,53 +216,102 @@ public: return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1; } - void reserve(int num_elem) { + template <int JoinOpType> + void prepare_build(size_t num_elem, int batch_size) { + max_batch_size = batch_size; bucket_size = calc_bucket_size(num_elem + 1); first.resize(bucket_size, 0); next.resize(num_elem); - } - void build(const Key* __restrict keys, const size_t* __restrict hash_values, size_t num_elem, - int batch_size) { - _batch_size = batch_size; - bucket_size = calc_bucket_size(num_elem); - first.resize(bucket_size, 0); - next.resize(num_elem); + if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || + JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { + visited.resize(num_elem, 0); + } + } + void build(const Key* __restrict keys, const size_t* __restrict bucket_nums, + const size_t num_elem) { build_keys = keys; for (size_t i = 1; i < num_elem; i++) { - next[i] = first[hash_values[i]]; - first[hash_values[i]] = i; + next[i] = first[bucket_nums[i]]; + first[bucket_nums[i]] = i; } } template <int JoinOpType> - auto find_batch(const Key* __restrict keys, const size_t* __restrict hash_values, int probe_idx, + auto find_batch(const Key* __restrict keys, const size_t* __restrict bucket_nums, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs, std::vector<uint32_t>& build_idxs) { if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN || - JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN) { - return _find_batch_inner_outer_join<JoinOpType>(keys, hash_values, probe_idx, + JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || + JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) { + return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, probe_idx, probe_rows, probe_idxs, build_idxs); } if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) { - return _find_batch_left_semi_anti<JoinOpType>(keys, hash_values, probe_idx, probe_rows, + return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums, probe_idx, probe_rows, probe_idxs); } + if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { + return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, probe_rows); + } return std::pair {0, 0}; } size_t get_bucket_mask() { return bucket_size - 1; } + template <int JoinOpType> + bool iterate_map(std::vector<uint32_t>& build_idxs) const { + const auto batch_size = max_batch_size; + const auto elem_num = visited.size(); + int count = 0; + build_idxs.reserve(batch_size); + + while (count < batch_size && iter_idx < elem_num) { + const auto matched = visited[iter_idx]; + build_idxs[count] = iter_idx; + if constexpr (JoinOpType != doris::TJoinOp::RIGHT_ANTI_JOIN) { + count += !matched; + } else { + count += matched; + } + iter_idx++; + } + + build_idxs.resize(count); + return iter_idx == elem_num; + } + private: + auto _find_batch_right_semi_anti(const Key* __restrict keys, + const size_t* __restrict bucket_nums, int probe_idx, + int probe_rows) { + while (LIKELY(probe_idx < probe_rows)) { + auto build_idx = first[bucket_nums[probe_idx]]; + + while (build_idx) { + if (keys[probe_idx] == build_keys[build_idx]) { + visited[build_idx] = 1; + } + build_idx = next[build_idx]; + } + } + return std::pair {probe_rows, 0}; + } + template <int JoinOpType> auto _find_batch_left_semi_anti(const Key* __restrict keys, - const size_t* __restrict hash_values, int probe_idx, + const size_t* __restrict bucket_nums, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs) { + const auto batch_size = max_batch_size; int matched_cnt = 0; - while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) { - uint32_t build_idx = first[hash_values[probe_idx]]; + while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { + uint32_t build_idx = first[bucket_nums[probe_idx]]; while (build_idx) { if (keys[probe_idx] == build_keys[build_idx]) { break; @@ -279,31 +328,36 @@ private: template <int JoinOpType> auto _find_batch_inner_outer_join(const Key* __restrict keys, - const size_t* __restrict hash_values, int probe_idx, + const size_t* __restrict bucket_nums, int probe_idx, int probe_rows, std::vector<uint32_t>& probe_idxs, std::vector<uint32_t>& build_idxs) { int matched_cnt = 0; uint32_t build_idx = 0; + const auto batch_size = max_batch_size; auto do_the_probe = [&]() { - while (build_idx && LIKELY(matched_cnt < _batch_size)) { + while (build_idx && LIKELY(matched_cnt < batch_size)) { if (keys[probe_idx] == build_keys[build_idx]) { probe_idxs[matched_cnt] = probe_idx; build_idxs[matched_cnt] = build_idx; + if constexpr (JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) { + visited[build_idx] = 1; + } matched_cnt++; } build_idx = next[build_idx]; } - if constexpr (JoinOpType != doris::TJoinOp::INNER_JOIN) { + if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) { // `(!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)` means not match one build side if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)) { probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = build_idx; + build_idxs[matched_cnt] = 0; matched_cnt++; } } - probe_idx++; }; @@ -314,25 +368,31 @@ private: current_build_idx = 0; do_the_probe(); } - while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) { - build_idx = first[hash_values[probe_idx]]; + while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) { + build_idx = first[bucket_nums[probe_idx]]; do_the_probe(); } - if (matched_cnt == _batch_size && build_idx) { - current_probe_idx = probe_idx - 1; + if (matched_cnt == batch_size && build_idx) { + probe_idx--; + current_probe_idx = probe_idx; current_build_idx = build_idx; } return std::pair {probe_idx, matched_cnt}; } const Key* __restrict build_keys; - uint32_t bucket_size = 0; - int _batch_size = 0; + std::vector<uint8_t> visited; + int max_batch_size = 0; + + // use to save the last probe idx if the matched == max_batch_size int current_probe_idx = -1; uint32_t current_build_idx = 0; + mutable uint32_t iter_idx = 1; + + uint32_t bucket_size = 0; std::vector<uint32_t> first; std::vector<uint32_t> next; Cell cell; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 6e8f9e0746c..549301ae477 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -69,7 +69,7 @@ struct MethodBase { } } virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr) = 0; + const uint8_t* null_map = nullptr, bool is_build = false) = 0; void init_hash_values(size_t num_rows, const uint8_t* null_map) { if (null_map == nullptr) { @@ -81,7 +81,6 @@ struct MethodBase { if (null_map[k]) { continue; } - hash_values[k] = hash_table->hash(keys[k]); } } @@ -95,7 +94,7 @@ struct MethodBase { void calculate_bucket(size_t num_rows) { size_t mask = hash_table->get_bucket_mask(); - for (int i = 0; i < num_rows; i++) { + for (size_t i = 0; i < num_rows; i++) { hash_values[i] &= mask; } } @@ -169,7 +168,7 @@ struct MethodSerialized : public MethodBase<TData> { } void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr) override { + const uint8_t* null_map = nullptr, bool is_build = false) override { Base::arena.clear(); stored_keys.resize(num_rows); @@ -227,7 +226,7 @@ struct MethodStringNoCache : public MethodBase<TData> { std::vector<StringRef> stored_keys; void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr) override { + const uint8_t* null_map = nullptr, bool is_build = false) override { const IColumn& column = *key_columns[0]; const auto& column_string = assert_cast<const ColumnString&>( column.is_nullable() @@ -263,7 +262,7 @@ struct MethodOneNumber : public MethodBase<TData> { FieldType>; void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr) override { + const uint8_t* null_map = nullptr, bool is_build = false) override { Base::keys = (FieldType*)(key_columns[0]->is_nullable() ? assert_cast<const ColumnNullable*>(key_columns[0]) ->get_nested_column_ptr() @@ -296,7 +295,9 @@ struct MethodKeysFixed : public MethodBase<TData> { using State = ColumnsHashing::HashMethodKeysFixed<typename Base::Value, Key, Mapped, has_nullable_keys>; - + // need keep until the hash probe end. + std::vector<Key> build_stored_keys; + // refresh each time probe std::vector<Key> stored_keys; Sizes key_sizes; @@ -365,7 +366,7 @@ struct MethodKeysFixed : public MethodBase<TData> { } void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr) override { + const uint8_t* null_map = nullptr, bool is_build = false) override { ColumnRawPtrs actual_columns; ColumnRawPtrs null_maps; if (has_nullable_keys) { @@ -383,8 +384,14 @@ struct MethodKeysFixed : public MethodBase<TData> { } else { actual_columns = key_columns; } - stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps); - Base::keys = stored_keys.data(); + + if (is_build) { + build_stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps); + Base::keys = build_stored_keys.data(); + } else { + stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps); + Base::keys = stored_keys.data(); + } Base::init_hash_values(num_rows, null_map); } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index f4bef996911..fc408711768 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -85,7 +85,7 @@ void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column( _tuple_is_null_right_flags->resize(size); auto* __restrict null_data = _tuple_is_null_right_flags->data(); for (int i = 0; i < size; ++i) { - null_data[i] = _build_block_rows[i] == -1; + null_data[i] = _build_block_rows[i] == 0; } } } @@ -570,131 +570,134 @@ template <typename HashTableType> Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable( HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, bool* eos) { - using Mapped = typename HashTableType::Mapped; + // using Mapped = typename HashTableType::Mapped; SCOPED_TIMER(_probe_process_hashtable_timer); - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> || - std::is_same_v<Mapped, RowRefListWithFlags>) { - hash_table_ctx.init_iterator(); - auto& mcol = mutable_block.mutable_columns(); - - bool right_semi_anti_without_other = _is_right_semi_anti && !_have_other_join_conjunct; - int right_col_idx = - right_semi_anti_without_other ? 0 : _parent->left_table_data_types().size(); - int right_col_len = _parent->right_table_data_types().size(); - - auto& iter = hash_table_ctx.iterator; - auto block_size = 0; - auto& visited_iter = - std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter); - _build_blocks_locs.resize(_batch_size); - if (visited_iter.ok()) { - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - } else { - for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - if (visited_iter->visited) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - } else { - if (!visited_iter->visited) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - } - } - } - if (!visited_iter.ok()) { - ++iter; - } - } + // if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> || + // std::is_same_v<Mapped, RowRefListWithFlags>) { + // hash_table_ctx.init_iterator(); + // auto& mcol = mutable_block.mutable_columns(); + // + // bool right_semi_anti_without_other = _is_right_semi_anti && !_have_other_join_conjunct; + // int right_col_idx = + // right_semi_anti_without_other ? 0 : _parent->left_table_data_types().size(); + // int right_col_len = _parent->right_table_data_types().size(); + // + // auto& iter = hash_table_ctx.iterator; + // auto block_size = 0; + // auto& visited_iter = + // std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter); + // _build_blocks_locs.resize(_batch_size); + // if (visited_iter.ok()) { + // if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + // for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // } else { + // for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { + // if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + // if (visited_iter->visited) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // } else { + // if (!visited_iter->visited) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // } + // } + // } + // if (!visited_iter.ok()) { + // ++iter; + // } + // } + // + // for (; iter != hash_table_ctx.hash_table->end() && block_size < _batch_size; ++iter) { + // auto& mapped = iter->get_second(); + // if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + // if (mapped.visited) { + // if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + // visited_iter = mapped.begin(); + // for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // if (visited_iter.ok()) { + // // block_size >= _batch_size, quit for loop + // break; + // } + // } + // } else { + // if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + // visited_iter = mapped.begin(); + // for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // if (visited_iter.ok()) { + // // block_size >= _batch_size, quit for loop + // break; + // } + // } + // } + // } else { + // visited_iter = mapped.begin(); + // for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { + // if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + // if (visited_iter->visited) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // } else { + // if (!visited_iter->visited) { + // _build_blocks_locs[block_size++] = visited_iter->row_num; + // } + // } + // } + // if (visited_iter.ok()) { + // // block_size >= _batch_size, quit for loop + // + // const auto size = _build_blocks_locs.size(); + // _build_block_rows.resize(_build_blocks_locs.size()); + // for (int i = 0; i < size; i++) { + // _build_block_rows[i] = _build_blocks_locs[i]; + // } break; + // } + // } + // } + // _build_blocks_locs.resize(block_size); - for (; iter != hash_table_ctx.hash_table->end() && block_size < _batch_size; ++iter) { - auto& mapped = iter->get_second(); - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - if (mapped.visited) { - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - visited_iter = mapped.begin(); - for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - if (visited_iter.ok()) { - // block_size >= _batch_size, quit for loop - break; - } - } - } else { - if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { - visited_iter = mapped.begin(); - for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - if (visited_iter.ok()) { - // block_size >= _batch_size, quit for loop - break; - } - } - } - } else { - visited_iter = mapped.begin(); - for (; visited_iter.ok() && block_size < _batch_size; ++visited_iter) { - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - if (visited_iter->visited) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - } else { - if (!visited_iter->visited) { - _build_blocks_locs[block_size++] = visited_iter->row_num; - } - } - } - if (visited_iter.ok()) { - // block_size >= _batch_size, quit for loop - break; - } - } - } - _build_blocks_locs.resize(block_size); - - const auto size = _build_blocks_locs.size(); - _build_block_rows.resize(_build_blocks_locs.size()); - for (int i = 0; i < size; i++) { - _build_block_rows[i] = _build_blocks_locs[i]; - } - - for (size_t j = 0; j < right_col_len; ++j) { - const auto& column = *_build_block->get_by_position(j).column; - mcol[j + right_col_idx]->insert_indices_from( - column, _build_block_rows.data(), - _build_block_rows.data() + _build_block_rows.size()); - } + auto& mcol = mutable_block.mutable_columns(); + *eos = hash_table_ctx.hash_table->template iterate_map<JoinOpType>(_build_block_rows); + auto block_size = _build_block_rows.size(); + int right_col_idx = _parent->left_table_data_types().size(); + int right_col_len = _parent->right_table_data_types().size(); + + for (size_t j = 0; j < right_col_len; ++j) { + const auto& column = *_build_block->get_by_position(j).column; + mcol[j + right_col_idx]->insert_indices_from( + column, _build_block_rows.data(), + _build_block_rows.data() + _build_block_rows.size()); + } - // just resize the left table column in case with other conjunct to make block size is not zero - if (_is_right_semi_anti && _have_other_join_conjunct) { - auto target_size = mcol[right_col_idx]->size(); - for (int i = 0; i < right_col_idx; ++i) { - mcol[i]->resize(target_size); - } + // just resize the left table column in case with other conjunct to make block size is not zero + if (_is_right_semi_anti && _have_other_join_conjunct) { + auto target_size = mcol[right_col_idx]->size(); + for (int i = 0; i < right_col_idx; ++i) { + mcol[i]->resize(target_size); } + } - // right outer join / full join need insert data of left table - if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - for (int i = 0; i < right_col_idx; ++i) { - assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size); - } - _tuple_is_null_left_flags->resize_fill(block_size, 1); + // right outer join / full join need insert data of left table + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + for (int i = 0; i < right_col_idx; ++i) { + assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size); } - *eos = iter == hash_table_ctx.hash_table->end(); - output_block->swap( - mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); - DCHECK(block_size <= _batch_size); - return Status::OK(); - } else { - LOG(FATAL) << "Invalid RowRefList"; - return Status::InvalidArgument("Invalid RowRefList"); + _tuple_is_null_left_flags->resize_fill(block_size, 1); } + output_block->swap(mutable_block.to_block(0)); + DCHECK(block_size <= _batch_size); + return Status::OK(); + // else { + // LOG(FATAL) << "Invalid RowRefList"; + // return Status::InvalidArgument("Invalid RowRefList"); + // } } template <int JoinOpType, typename Parent> diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index e765323f39b..cd2d4eb6445 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -972,28 +972,30 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { Status st = _extract_join_column<true>(block, null_map_val, raw_ptrs, res_col_ids); st = std::visit( - Overload { - [&](std::monostate& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - return Status::OK(); - }, - [&](auto&& arg, auto has_null_value, - auto short_circuit_for_null_in_build_side) -> Status { - using HashTableCtxType = std::decay_t<decltype(arg)>; - ProcessHashTableBuild<HashTableCtxType, HashJoinNode> - hash_table_build_process(rows, block, raw_ptrs, this, - state->batch_size(), state); - return hash_table_build_process - .template run<has_null_value, short_circuit_for_null_in_build_side>( - arg, - has_null_value || short_circuit_for_null_in_build_side - ? &null_map_val->get_data() - : nullptr, - &_has_null_in_build_side); - }}, - *_hash_table_variants, make_bool_variant(_build_side_ignore_null), + Overload {[&](std::monostate& arg, auto join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + return Status::OK(); + }, + [&](auto&& arg, auto&& join_op, auto has_null_value, + auto short_circuit_for_null_in_build_side) -> Status { + using HashTableCtxType = std::decay_t<decltype(arg)>; + using JoinOpType = std::decay_t<decltype(join_op)>; + + ProcessHashTableBuild<HashTableCtxType, HashJoinNode> + hash_table_build_process(rows, block, raw_ptrs, this, + state->batch_size(), state); + return hash_table_build_process + .template run<JoinOpType::value, has_null_value, + short_circuit_for_null_in_build_side>( + arg, + has_null_value || short_circuit_for_null_in_build_side + ? &null_map_val->get_data() + : nullptr, + &_has_null_in_build_side); + }}, + *_hash_table_variants, _join_op_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_short_circuit_for_null_in_build_side)); return st; diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index c15e3674642..5fc1c3c6cae 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -105,7 +105,7 @@ using ProfileCounter = RuntimeProfile::Counter; template <class HashTableContext, typename Parent> struct ProcessHashTableBuild { - ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, + ProcessHashTableBuild(uint32_t rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, Parent* parent, int batch_size, RuntimeState* state) : _rows(rows), _acquired_block(acquired_block), @@ -115,7 +115,7 @@ struct ProcessHashTableBuild { _state(state), _build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {} - template <bool ignore_null, bool short_circuit_for_null> + template <int JoinOpType, bool ignore_null, bool short_circuit_for_null> Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { if (short_circuit_for_null || ignore_null) { for (int i = 0; i < _rows; i++) { @@ -131,18 +131,20 @@ struct ProcessHashTableBuild { if (!_parent->runtime_filter_descs().empty()) { _parent->_inserted_blocks.insert(&_acquired_block); } - hash_table_ctx.hash_table->reserve(_rows); + + SCOPED_TIMER(_parent->_build_table_insert_timer); + hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _state->batch_size()); + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, - null_map ? null_map->data() : nullptr); + null_map ? null_map->data() : nullptr, true); hash_table_ctx.calculate_bucket(_rows); - SCOPED_TIMER(_parent->_build_table_insert_timer); hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), - _rows, _state->batch_size()); + _rows); return Status::OK(); } private: - const int _rows; + const uint32_t _rows; Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; Parent* _parent; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
