HappenLee commented on a change in pull request #8393: URL: https://github.com/apache/incubator-doris/pull/8393#discussion_r822262714
########## File path: be/src/exprs/shared_hash_table.cpp ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/shared_hash_table.h" + +#include <mutex> +#include <string> +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/plan_fragment_executor.h" +#include "util/time.h" +//#include "vec/common/arena.h" + +namespace doris { +Status SharedHashTableVal::get_callback(vectorized::shared_hash_table_operator* hash_table_accessor, + vectorized::shared_hash_table_barrier* hash_table_barrier) { + *hash_table_accessor = std::bind(&SharedHashTableVal::shared_hash_table_operate, + this, std::placeholders::_1); + + *hash_table_barrier = std::bind(&SharedHashTableVal::shared_hash_table_barrier,this); Review comment: code format , this clang format may help you ########## File path: be/src/vec/exec/join/vhash_join_node.cpp ########## @@ -874,8 +890,22 @@ Status HashJoinNode::open(RuntimeState* state) { if (_vother_join_conjunct_ptr) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } - - RETURN_IF_ERROR(_hash_table_build(state)); + if (!_shared_hash_table_ctx._use_shared_hash_table) { + RETURN_IF_ERROR(_hash_table_build(state)); + } else { + if (_shared_hash_table_ctx._is_leader) { + RETURN_IF_ERROR(_hash_table_build(state)); + LOG(INFO) << "HashJoinNode::open: leader have build hash table"; + _shared_hash_table_ctx._hash_table_operator(_shared_hash_table_ctx); + } else { + // wait for leader build shared hash table. + while (!_shared_hash_table_ctx._hash_table_operator(_shared_hash_table_ctx)) { Review comment: i m not sure? here may a endless loop? ########## File path: be/src/exprs/shared_hash_table.cpp ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/shared_hash_table.h" + +#include <mutex> +#include <string> +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/plan_fragment_executor.h" +#include "util/time.h" +//#include "vec/common/arena.h" + +namespace doris { +Status SharedHashTableVal::get_callback(vectorized::shared_hash_table_operator* hash_table_accessor, Review comment: The function only return OK?may change to void ########## File path: be/src/vec/exec/join/vhash_join_node.cpp ########## @@ -874,8 +890,22 @@ Status HashJoinNode::open(RuntimeState* state) { if (_vother_join_conjunct_ptr) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } - - RETURN_IF_ERROR(_hash_table_build(state)); + if (!_shared_hash_table_ctx._use_shared_hash_table) { Review comment: the if else seems better in _hash_table_build(), not expose here ########## File path: be/src/runtime/shared_hash_table.h ########## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <map> +#include <memory> +#include <mutex> +#include <thread> + +#include "common/object_pool.h" +#include "common/status.h" +#include "util/time.h" +#include "util/uid_util.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "vec/exec/join/vhash_join_node.h" + +namespace doris { +class SharedHashTableVal; +class Arena; + +class SharedHashTableMgr { +public: + SharedHashTableMgr() = default; + ~SharedHashTableMgr() = default; + void set_shared_hash_table_params(TSharedHashTableParams shared_hash_table_params) { + contain_shared_hash_table = shared_hash_table_params.contain_shared_hash_table; + is_leader = shared_hash_table_params.is_leader; + } + bool get_contain_shared_hash_table() { + return contain_shared_hash_table; + } + bool get_is_leader() { + return is_leader; + } +private: + bool contain_shared_hash_table = false; + bool is_leader = false; +}; + +class SharedHashTableVal { +private: + std::condition_variable _condition_setup; + std::condition_variable _condition_barrier; + std::mutex _mutex; + std::shared_ptr<vectorized::HashTableVariants> _hash_table_variants; + std::shared_ptr<std::vector<vectorized::Block>> _build_blocks; + std::shared_ptr<std::unordered_map<const vectorized::Block*, std::vector<int>>> _inserted_rows; + std::shared_ptr<vectorized::Arena> _arena; + std::shared_ptr<vectorized::Sizes> _probe_key_sz; + std::shared_ptr<vectorized::Sizes> _build_key_sz; + int _sharers_count = 0; +public: + SharedHashTableVal() = default; + ~SharedHashTableVal() = default; + void set_sharers_count(int count) {_sharers_count = count;} + int get_sharers_count() {return _sharers_count;} + bool shared_hash_table_operate(vectorized::SharedHashTableContext& shared_hash_table_ctx); + bool shared_hash_table_barrier(); + Status get_callback(vectorized::shared_hash_table_operator* hash_table_operator, + vectorized::shared_hash_table_barrier* hash_table_barrier); +}; +// controller -> <query-id, entity> +// SharedHashTableControlEntity is the context used by runtimefilter for merging +// During a query, only the last sink node owns this class, with the end of the query, +// the class is destroyed with the last fragment_exec. +class SharedHashTableControlEntity { +public: + SharedHashTableControlEntity() : _query_id(0, 0) {} + ~SharedHashTableControlEntity() = default; + + Status init(UniqueId query_id, + int instacnces_count_in_same_process, + const std::vector<int32_t>& shared_hash_table_ids); + + UniqueId query_id() { return _query_id; } + Status find_hash_table_val(int hash_table_id, SharedHashTableVal* &val); +private: + UniqueId _query_id; + // protect _shared_hash_table_map + std::mutex _shared_hash_table_mutex; + // hash-table-id -> val + std::map<int, std::shared_ptr<SharedHashTableVal>> _shared_hash_table_map; +}; + +// RuntimeFilterMergeController has a map query-id -> entity Review comment: Not the RuntimeFilter ########## File path: be/src/vec/exec/join/vhash_join_node.cpp ########## @@ -715,7 +722,16 @@ Status HashJoinNode::prepare(RuntimeState* state) { _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); // Hash Table Init - _hash_table_init(); + if (!_shared_hash_table_ctx._use_shared_hash_table) { Review comment: the if else seems better in `_hash_table_init()`, not expose here ########## File path: be/src/exprs/shared_hash_table.cpp ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/shared_hash_table.h" + +#include <mutex> +#include <string> +#include "runtime/exec_env.h" +#include "runtime/mem_tracker.h" +#include "runtime/plan_fragment_executor.h" +#include "util/time.h" +//#include "vec/common/arena.h" Review comment: if not use, del the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
