This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 922134f3ce0e1d756532bb44e096f05c3c231aa5 Author: BiteTheDDDDt <[email protected]> AuthorDate: Thu Oct 26 13:40:52 2023 +0800 update update update fix fix update fix update --- be/src/exprs/runtime_filter_slots.h | 5 +- be/src/pipeline/pipeline_x/dependency.h | 5 +- .../local_exchange_sink_operator.cpp | 2 +- be/src/vec/columns/column_vector.cpp | 13 +--- be/src/vec/common/hash_table/hash_map.h | 14 +++- be/src/vec/common/hash_table/hash_map_context.h | 90 ++++++++++++++-------- be/src/vec/core/block.cpp | 2 +- .../vec/exec/join/process_hash_table_probe_impl.h | 4 +- be/src/vec/exec/join/vhash_join_node.cpp | 8 ++ be/src/vec/exec/join/vhash_join_node.h | 8 +- 10 files changed, 91 insertions(+), 60 deletions(-) diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 63c5665d271..0f841e5a60f 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -174,6 +174,7 @@ public: auto column = it->get_by_position(result_column_id).column; std::vector<int> indexs; + // indexs start from 1 because the first row is mocked for join hash map if (const auto* nullable = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { column = nullable->get_nested_column_ptr(); @@ -181,14 +182,14 @@ public: nullable->get_null_map_column_ptr().get()) ->get_data() .data(); - for (int i = 0; i < column->size(); i++) { + for (int i = 1; i < column->size(); i++) { if (null_map[i]) { continue; } indexs.push_back(i); } } else { - for (int i = 0; i < column->size(); i++) { + for (int i = 1; i < column->size(); i++) { indexs.push_back(i); } } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 290df96cbad..e7f6e3689fe 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -839,8 +839,9 @@ private: bool is_set_probe {false}; }; -using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>, - std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>; +using PartitionedBlock = + std::pair<std::shared_ptr<vectorized::Block>, + std::tuple<std::shared_ptr<std::vector<uint32_t>>, size_t, size_t>>; struct LocalExchangeSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index b69150ba512..7b45a600ac5 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -37,7 +37,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state, auto& data_queue = _shared_state->data_queue; const auto num_partitions = data_queue.size(); const auto rows = block->rows(); - auto row_idx = std::make_shared<std::vector<int>>(rows); + auto row_idx = std::make_shared<std::vector<uint32_t>>(rows); { _partition_rows_histogram.assign(num_partitions + 1, 0); for (size_t i = 0; i < rows; ++i) { diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index c4ca97df7a2..69c09dcde80 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -375,17 +375,8 @@ void ColumnVector<T>::insert_indices_from(const IColumn& src, const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data); - if constexpr (std::is_same_v<T, UInt8>) { - // nullmap : indices_begin[i] == 0 means is null at the here, set true here - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = - (indices_begin[i] == 0) + (indices_begin[i] != 0) * src_data[indices_begin[i]]; - } - } else { - // real data : indices_begin[i] == 0 what at is meaningless - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = src_data[indices_begin[i]]; - } + for (int i = 0; i < new_size; ++i) { + data[origin_size + i] = src_data[indices_begin[i]]; } } diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 1c132fc99ed..dfb01556fc6 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -220,7 +220,7 @@ public: void prepare_build(size_t num_elem, int batch_size) { max_batch_size = batch_size; bucket_size = calc_bucket_size(num_elem + 1); - first.resize(bucket_size, 0); + first.resize(bucket_size + 1, 0); next.resize(num_elem); if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN || @@ -238,6 +238,7 @@ public: next[i] = first[bucket_nums[i]]; first[bucket_nums[i]] = i; } + first[bucket_size] = 0; } template <int JoinOpType> @@ -275,7 +276,7 @@ public: while (count < batch_size && iter_idx < elem_num) { const auto matched = visited[iter_idx]; build_idxs[count] = iter_idx; - if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN) { + if constexpr (JoinOpType != doris::TJoinOp::RIGHT_ANTI_JOIN) { count += !matched; } else { count += matched; @@ -336,6 +337,10 @@ private: uint32_t build_idx = 0; const auto batch_size = max_batch_size; + if (!build_keys) { + probe_idx = probe_rows; + } + auto do_the_probe = [&]() { while (build_idx && LIKELY(matched_cnt < batch_size)) { if (keys[probe_idx] == build_keys[build_idx]) { @@ -353,7 +358,7 @@ private: if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN || JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) { // `(!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)` means not match one build side - if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx)) { + if (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx) { probe_idxs[matched_cnt] = probe_idx; build_idxs[matched_cnt] = 0; matched_cnt++; @@ -381,7 +386,8 @@ private: return std::pair {probe_idx, matched_cnt}; } - const Key* __restrict build_keys; + const Key* __restrict build_keys = nullptr; + std::vector<uint8_t> visited; int max_batch_size = 0; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 549301ae477..87014787895 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -72,11 +72,14 @@ struct MethodBase { const uint8_t* null_map = nullptr, bool is_build = false) = 0; void init_hash_values(size_t num_rows, const uint8_t* null_map) { + hash_values.resize(num_rows); if (null_map == nullptr) { - init_hash_values(num_rows); + for (size_t k = 0; k < num_rows; ++k) { + hash_values[k] = hash_table->hash(keys[k]); + } return; } - hash_values.resize(num_rows); + for (size_t k = 0; k < num_rows; ++k) { if (null_map[k]) { continue; @@ -85,20 +88,6 @@ struct MethodBase { } } - void init_hash_values(size_t num_rows) { - hash_values.resize(num_rows); - for (size_t k = 0; k < num_rows; ++k) { - hash_values[k] = hash_table->hash(keys[k]); - } - } - - void calculate_bucket(size_t num_rows) { - size_t mask = hash_table->get_bucket_mask(); - for (size_t i = 0; i < num_rows; i++) { - hash_values[i] &= mask; - } - } - template <bool read> void prefetch(int current) { if (LIKELY(current + HASH_MAP_PREFETCH_DIST < hash_values.size())) { @@ -153,6 +142,10 @@ struct MethodSerialized : public MethodBase<TData> { using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, typename Base::Mapped>; using Base::try_presis_key; + // need keep until the hash probe end. + std::vector<StringRef> build_stored_keys; + Arena build_arena; + // refresh each time probe std::vector<StringRef> stored_keys; StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size, @@ -167,10 +160,10 @@ struct MethodSerialized : public MethodBase<TData> { return {begin, sum_size}; } - void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr, bool is_build = false) override { - Base::arena.clear(); - stored_keys.resize(num_rows); + void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, + std::vector<StringRef>& keys, Arena& arena) { + arena.clear(); + keys.resize(num_rows); size_t max_one_row_byte_size = 0; for (const auto& column : key_columns) { @@ -182,25 +175,31 @@ struct MethodSerialized : public MethodBase<TData> { // reach mem limit, don't serialize in batch size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { - stored_keys[i] = - serialize_keys_to_pool_contiguous(i, keys_size, key_columns, Base::arena); + keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, arena); } } else { - auto* serialized_key_buffer = - reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes)); + auto* serialized_key_buffer = reinterpret_cast<uint8_t*>(arena.alloc(total_bytes)); for (size_t i = 0; i < num_rows; ++i) { - stored_keys[i].data = + keys[i].data = reinterpret_cast<char*>(serialized_key_buffer + i * max_one_row_byte_size); - stored_keys[i].size = 0; + keys[i].size = 0; } for (const auto& column : key_columns) { - column->serialize_vec(stored_keys, num_rows, max_one_row_byte_size); + column->serialize_vec(keys, num_rows, max_one_row_byte_size); } } - Base::keys = stored_keys.data(); - Base::init_hash_values(num_rows, null_map); + Base::keys = keys.data(); + } + + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, + const uint8_t* null_map = nullptr, bool is_build = false) override { + init_serialized_keys_impl(key_columns, num_rows, is_build ? build_stored_keys : stored_keys, + is_build ? build_arena : Base::arena); + if (!is_build) { + Base::init_hash_values(num_rows, null_map); + } } void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns, @@ -241,7 +240,9 @@ struct MethodStringNoCache : public MethodBase<TData> { } Base::keys = stored_keys.data(); - Base::init_hash_values(num_rows, null_map); + if (!is_build) { + Base::init_hash_values(num_rows, null_map); + } } void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns, @@ -270,7 +271,9 @@ struct MethodOneNumber : public MethodBase<TData> { ->get_raw_data() .data; std::string name = key_columns[0]->get_name(); - Base::init_hash_values(num_rows, null_map); + if (!is_build) { + Base::init_hash_values(num_rows, null_map); + } } void insert_keys_into_columns(std::vector<typename Base::Key>& keys, @@ -392,7 +395,9 @@ struct MethodKeysFixed : public MethodBase<TData> { stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps); Base::keys = stored_keys.data(); } - Base::init_hash_values(num_rows, null_map); + if (!is_build) { + Base::init_hash_values(num_rows, null_map); + } } void insert_keys_into_columns(std::vector<typename Base::Key>& keys, @@ -510,4 +515,25 @@ template <class Key, bool has_null, typename Value> using FixedKeyHashTableContext = MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>, has_null>; +template <typename HashTableContext> +void init_bucket_num(HashTableContext& ctx, size_t num_rows, const uint8_t* null_map) { + ctx.hash_values.resize(num_rows); + if (null_map == nullptr) { + size_t mask = ctx.hash_table->get_bucket_mask(); + for (size_t k = 0; k < num_rows; ++k) { + ctx.hash_values[k] = ctx.hash_table->hash(ctx.keys[k]) & mask; + } + return; + } + + size_t mask = ctx.hash_table->get_bucket_mask(); + for (size_t k = 0; k < num_rows; ++k) { + if (null_map[k]) { + ctx.hash_values[k] = mask + 1; + } else { + ctx.hash_values[k] = ctx.hash_table->hash(ctx.keys[k]) & mask; + } + } +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 931501d024b..5dbaad805f6 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -1066,7 +1066,7 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size, bool is_rese if (is_reserve) { column->reserve(size); } else { - column->resize(size); + column->insert_many_defaults(size); } temp_block->insert({std::move(column), d.type, d.name}); } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index a267a96d55f..c0e8c351f46 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -68,7 +68,7 @@ void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column( constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - if (!is_semi_anti_join || have_other_join_conjunct) { + if ((!is_semi_anti_join || have_other_join_conjunct) && size) { for (int i = 0; i < _right_col_len; i++) { const auto& column = *_build_block->safe_get_by_position(i).column; if (output_slot_flags[i]) { @@ -134,7 +134,7 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p _parent->_ready_probe = true; hash_table_ctx.reset(); hash_table_ctx.init_serialized_keys(_parent->_probe_columns, probe_rows, null_map); - hash_table_ctx.calculate_bucket(probe_rows); + init_bucket_num(hash_table_ctx,probe_rows, null_map); } return typename HashTableType::State(_parent->_probe_columns); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 9e8c99c8d56..747bcfdc02c 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -955,6 +955,14 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, *_build_expr_call_timer, res_col_ids)); if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { _convert_block_to_null(block); + // first row is mocked + for (int i = 0; i < block.columns(); i++) { + assert_cast<ColumnNullable*>( + (*std::move(block.safe_get_by_position(i).column)).mutate().get()) + ->get_null_map_column() + .get_data() + .data()[0] = 1; + } } // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc` // so we have to initialize this flag by the first build block. diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 6fb1703120e..3bcdbfae3ba 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -87,10 +87,8 @@ struct ProcessRuntimeFilterBuild { state, hash_table_ctx.hash_table->size(), parent->_build_rf_cardinality)); if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_blocks.empty()) { - { - SCOPED_TIMER(parent->_push_compute_timer); - parent->_runtime_filter_slots->insert(parent->_inserted_blocks); - } + SCOPED_TIMER(parent->_push_compute_timer); + parent->_runtime_filter_slots->insert(parent->_inserted_blocks); } { SCOPED_TIMER(parent->_push_down_timer); @@ -137,7 +135,7 @@ struct ProcessHashTableBuild { hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, null_map ? null_map->data() : nullptr, true); - hash_table_ctx.calculate_bucket(_rows); + init_bucket_num(hash_table_ctx, _rows, null_map ? null_map->data() : nullptr); hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.hash_values.data(), _rows); return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
