This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37dbda6209c [pipelineX](refactor) Use class template to simplify join
(#25369)
37dbda6209c is described below
commit 37dbda6209c82b3e99efd08eeb73d4e072d8c437
Author: Gabriel <[email protected]>
AuthorDate: Fri Oct 13 16:51:55 2023 +0800
[pipelineX](refactor) Use class template to simplify join (#25369)
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 67 ++--
be/src/pipeline/exec/hashjoin_build_sink.h | 29 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 67 +++-
be/src/pipeline/exec/hashjoin_probe_operator.h | 43 ++-
.../pipeline/exec/partition_sort_sink_operator.cpp | 2 +-
.../exec/partition_sort_source_operator.cpp | 23 +-
.../pipeline/exec/partition_sort_source_operator.h | 1 +
be/src/pipeline/pipeline_x/dependency.h | 21 +-
be/src/vec/exec/join/process_hash_table_probe.h | 20 +-
.../vec/exec/join/process_hash_table_probe_impl.h | 353 ++++++++++-----------
be/src/vec/exec/join/vhash_join_node.cpp | 134 +-------
be/src/vec/exec/join/vhash_join_node.h | 216 ++++---------
be/src/vec/exec/join/vjoin_node_base.h | 2 +-
be/src/vec/runtime/shared_hash_table_controller.h | 1 +
14 files changed, 444 insertions(+), 535 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c9fe153af08..939363776b0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -52,11 +52,15 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_shared_state->probe_key_sz = p._build_key_sz;
- _shared_state->build_blocks.reset(new std::vector<vectorized::Block>());
- // avoid vector expand change block address.
- // one block can store 4g data, _build_blocks can store 128*4g data.
- // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
-
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+ if (p._is_broadcast_join &&
state->enable_share_hash_table_for_broadcast_join()) {
+ _shared_state->build_blocks = p._shared_hash_table_context->blocks;
+ } else {
+ _shared_state->build_blocks.reset(new
std::vector<vectorized::Block>());
+ // avoid vector expand change block address.
+ // one block can store 4g data, _build_blocks can store 128*4g data.
+ // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
+
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+ }
_shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
_shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
_build_expr_ctxs.resize(p._build_expr_ctxs.size());
@@ -79,6 +83,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
if (!_should_build_hash_table) {
_shared_hash_table_dependency->block_writing();
p._shared_hashtable_controller->append_dependency(p.id(),
_shared_hash_table_dependency);
+ } else if (p._is_broadcast_join) {
+ // avoid vector expand change block address.
+ // one block can store 4g data, _build_blocks can store 128*4g data.
+ // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
+
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
}
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -135,6 +144,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
return Status::OK();
}
+vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() {
+ return _parent->cast<HashJoinBuildSinkOperatorX>()._build_key_sz;
+}
+
+bool HashJoinBuildSinkLocalState::build_unique() const {
+ return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
+}
+
+std::vector<TRuntimeFilterDesc>&
HashJoinBuildSinkLocalState::runtime_filter_descs() const {
+ return _parent->cast<HashJoinBuildSinkOperatorX>()._runtime_filter_descs;
+}
+
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->short_circuit_for_probe =
@@ -201,9 +222,9 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
[&](auto&& arg, auto has_null_value,
auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
- vectorized::HashJoinBuildContext context(this);
- vectorized::ProcessHashTableBuild<HashTableCtxType>
- hash_table_build_process(rows, block,
raw_ptrs, &context,
+ vectorized::ProcessHashTableBuild<HashTableCtxType,
+
HashJoinBuildSinkLocalState>
+ hash_table_build_process(rows, block,
raw_ptrs, this,
state->batch_size(),
offset, state);
return hash_table_build_process
.template run<has_null_value,
short_circuit_for_null_in_build_side>(
@@ -246,11 +267,6 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
JoinOpType::value ==
TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN,
vectorized::RowRefListWithFlag,
vectorized::RowRefList>>;
- _shared_state->probe_row_match_iter
-
.emplace<vectorized::ForwardIterator<RowRefListType>>();
- _shared_state->outer_join_pull_visited_iter
-
.emplace<vectorized::ForwardIterator<RowRefListType>>();
-
if (_build_expr_ctxs.size() == 1 &&
!p._store_null_in_hash_table[0]) {
// Single column optimization
switch (_build_expr_ctxs[0]->root()->result_type()) {
@@ -519,18 +535,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
state,
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
local_state._build_block_idx));
}
- auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
- LOG(FATAL) << "FATAL: uninited
hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) -> Status {
- using HashTableCtxType =
std::decay_t<decltype(arg)>;
- vectorized::RuntimeFilterContext
context(&local_state);
-
vectorized::ProcessRuntimeFilterBuild<HashTableCtxType>
-
runtime_filter_build_process(&context);
- return
runtime_filter_build_process(state, arg);
- }},
- *local_state._shared_state->hash_table_variants);
+ auto ret = std::visit(
+ Overload {[&](std::monostate&) -> Status {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ },
+ [&](auto&& arg) -> Status {
+ vectorized::ProcessRuntimeFilterBuild
runtime_filter_build_process;
+ return runtime_filter_build_process(state, arg,
&local_state);
+ }},
+ *local_state._shared_state->hash_table_variants);
if (!ret.ok()) {
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = ret;
@@ -542,7 +556,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
_shared_hash_table_context->arena =
local_state._shared_state->arena;
- _shared_hash_table_context->blocks =
local_state._shared_state->build_blocks;
_shared_hash_table_context->hash_table_variants =
local_state._shared_state->hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
@@ -578,8 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
*std::static_pointer_cast<vectorized::HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
- local_state._shared_state->build_blocks =
_shared_hash_table_context->blocks;
-
if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
Overload {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 54848f7c868..459b66718c2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,12 +70,25 @@ public:
void init_short_circuit_for_probe();
HashJoinBuildSinkOperatorX* join_build() { return
(HashJoinBuildSinkOperatorX*)_parent; }
+ vectorized::Sizes& build_key_sz();
+ bool build_unique() const;
+ std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
+ std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
+
+ void add_hash_buckets_info(const std::string& info) const {
+ _profile->add_info_string("HashTableBuckets", info);
+ }
+ void add_hash_buckets_filled_info(const std::string& info) const {
+ _profile->add_info_string("HashTableFilledBuckets", info);
+ }
+
protected:
void _hash_table_init(RuntimeState* state);
void _set_build_ignore_flag(vectorized::Block& block, const
std::vector<int>& res_col_ids);
friend class HashJoinBuildSinkOperatorX;
- friend struct vectorized::HashJoinBuildContext;
- friend struct vectorized::RuntimeFilterContext;
+ template <class HashTableContext, typename Parent>
+ friend struct vectorized::ProcessHashTableBuild;
+ friend struct vectorized::ProcessRuntimeFilterBuild;
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -139,16 +152,13 @@ public:
}
bool should_dry_run(RuntimeState* state) override {
- auto tmp = _is_broadcast_join && !state->get_sink_local_state(id())
-
->cast<HashJoinBuildSinkLocalState>()
- ._should_build_hash_table;
- return tmp;
+ return _is_broadcast_join && !state->get_sink_local_state(id())
+
->cast<HashJoinBuildSinkLocalState>()
+ ._should_build_hash_table;
}
private:
friend class HashJoinBuildSinkLocalState;
- friend struct vectorized::HashJoinBuildContext;
- friend struct vectorized::RuntimeFilterContext;
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -165,9 +175,6 @@ private:
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-
- std::atomic_bool _probe_open_finish = false;
- bool _probe_ignore_null = false;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 1688b17778e..c4b009f7389 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -59,12 +59,68 @@ Status HashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info)
return Status::OK();
}
+Status HashJoinProbeLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(JoinProbeLocalState::open(state));
+
+ _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
+ auto& p = _parent->cast<HashJoinProbeOperatorX>();
+ std::visit(
+ [&](auto&& join_op_variants, auto have_other_join_conjunct) {
+ using JoinOpType = std::decay_t<decltype(join_op_variants)>;
+ using RowRefListType = std::conditional_t<
+ have_other_join_conjunct,
vectorized::RowRefListWithFlags,
+ std::conditional_t<JoinOpType::value ==
TJoinOp::RIGHT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::RIGHT_SEMI_JOIN ||
+ JoinOpType::value ==
TJoinOp::RIGHT_OUTER_JOIN ||
+ JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN,
+ vectorized::RowRefListWithFlag,
vectorized::RowRefList>>;
+
_probe_row_match_iter.emplace<vectorized::ForwardIterator<RowRefListType>>();
+ _outer_join_pull_visited_iter
+
.emplace<vectorized::ForwardIterator<RowRefListType>>();
+
_process_hashtable_ctx_variants->emplace<vectorized::ProcessHashTableProbe<
+ JoinOpType::value, HashJoinProbeLocalState>>(this,
state->batch_size());
+ },
+ _shared_state->join_op_variants,
+ vectorized::make_bool_variant(p._have_other_join_conjunct));
+ return Status::OK();
+}
+
void HashJoinProbeLocalState::prepare_for_next() {
_probe_index = 0;
_ready_probe = false;
_prepare_probe_block();
}
+bool HashJoinProbeLocalState::have_other_join_conjunct() const {
+ return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct;
+}
+
+bool HashJoinProbeLocalState::is_right_semi_anti() const {
+ return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti;
+}
+
+bool HashJoinProbeLocalState::is_outer_join() const {
+ return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join;
+}
+
+std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() {
+ return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags;
+}
+
+std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() {
+ return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags;
+}
+
+vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() {
+ return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types;
+}
+
+vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() {
+ return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types;
+}
+
Status HashJoinProbeLocalState::close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
@@ -111,16 +167,7 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState*
state) {
if (_probe_inited) {
return;
}
- _process_hashtable_ctx_variants =
std::make_unique<vectorized::HashTableCtxVariants>();
- std::visit(
- [&](auto&& join_op_variants) {
- using JoinOpType = std::decay_t<decltype(join_op_variants)>;
- _probe_context.reset(new
vectorized::HashJoinProbeContext(this));
- _process_hashtable_ctx_variants
-
->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>(
- _probe_context.get(), state->batch_size());
- },
- _shared_state->join_op_variants);
+
_probe_inited = true;
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 4d96b68b7d4..ed241141e5d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -45,6 +45,22 @@ public:
Status open(RuntimeState*) override { return Status::OK(); }
};
+class HashJoinProbeLocalState;
+
+using HashTableCtxVariants = std::variant<
+ std::monostate,
+ vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN,
HashJoinProbeLocalState>,
+ vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
+ HashJoinProbeLocalState>>;
+
class HashJoinProbeOperatorX;
class HashJoinProbeLocalState final
: public JoinProbeLocalState<HashJoinDependency,
HashJoinProbeLocalState> {
@@ -55,6 +71,7 @@ public:
~HashJoinProbeLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
void prepare_for_next();
@@ -64,13 +81,25 @@ public:
SourceState& source_state,
vectorized::Block* temp_block,
bool check_rows_count = true);
- HashJoinProbeOperatorX* join_probe() { return
(HashJoinProbeOperatorX*)_parent; }
+ bool have_other_join_conjunct() const;
+ bool is_right_semi_anti() const;
+ bool is_outer_join() const;
+ std::vector<bool>* left_output_slot_flags();
+ std::vector<bool>* right_output_slot_flags();
+ vectorized::DataTypes right_table_data_types();
+ vectorized::DataTypes left_table_data_types();
+ bool* has_null_in_build_side() { return
&_shared_state->_has_null_in_build_side; }
+ std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
+ return _shared_state->build_blocks;
+ }
+ vectorized::Sizes probe_key_sz() const { return
_shared_state->probe_key_sz; }
private:
void _prepare_probe_block();
bool _need_probe_null_map(vectorized::Block& block, const
std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
- friend struct vectorized::HashJoinProbeContext;
+ template <int JoinOpType, typename Parent>
+ friend struct vectorized::ProcessHashTableProbe;
int _probe_index = -1;
bool _ready_probe = false;
@@ -88,12 +117,15 @@ private:
bool _need_null_map_for_probe = false;
bool _has_set_need_null_map_for_probe = false;
- std::unique_ptr<vectorized::HashJoinProbeContext> _probe_context;
vectorized::ColumnUInt8::MutablePtr _null_map_column;
// for cases when a probe row matches more than batch size build rows.
bool _is_any_probe_match_row_output = false;
- std::unique_ptr<vectorized::HashTableCtxVariants>
_process_hashtable_ctx_variants =
- std::make_unique<vectorized::HashTableCtxVariants>();
+ std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
+ std::make_unique<HashTableCtxVariants>();
+
+ // for full/right outer join
+ vectorized::HashTableIteratorVariants _outer_join_pull_visited_iter;
+ vectorized::HashTableIteratorVariants _probe_row_match_iter;
RuntimeProfile::Counter* _probe_expr_call_timer;
RuntimeProfile::Counter* _probe_next_timer;
@@ -125,7 +157,6 @@ private:
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids) const;
friend class HashJoinProbeLocalState;
- friend struct vectorized::HashJoinProbeContext;
// other expr
vectorized::VExprContextSPtrs _other_join_conjuncts;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 907114829ee..4bd66be9dbf 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
COUNTER_SET(local_state._hash_table_size_counter,
int64_t(local_state._num_partition));
//so all data from child have sink completed
- local_state._dependency->set_ready_for_read();
+ local_state._dependency->set_eos();
}
return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index a80d3277006..f9003c65268 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -52,8 +52,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
-
output_block->columns()));
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+ local_state._conjuncts, output_block,
output_block->columns()));
if (local_state._shared_state->blocks_buffer.empty()) {
local_state._dependency->block_reading();
}
@@ -67,13 +67,12 @@ Status
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
// if we move the _blocks_buffer output at last(behind 286 line),
// it's maybe eos but not output all data: when _blocks_buffer.empty() and
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
output_block->columns()));
{
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() &&
- local_state._shared_state->sort_idx >=
- local_state._shared_state->partition_sorts.size()) {
+ local_state._sort_idx >=
local_state._shared_state->partition_sorts.size()) {
source_state = SourceState::FINISHED;
}
}
@@ -91,20 +90,18 @@ Status
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
SCOPED_TIMER(local_state._get_sorted_timer);
//sorter output data one by one
bool current_eos = false;
- if (local_state._shared_state->sort_idx <
local_state._shared_state->partition_sorts.size()) {
- RETURN_IF_ERROR(
-
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
- ->get_next(state, output_block, ¤t_eos));
+ if (local_state._sort_idx <
local_state._shared_state->partition_sorts.size()) {
+
RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next(
+ state, output_block, ¤t_eos));
}
if (current_eos) {
//current sort have eos, so get next idx
local_state._shared_state->previous_row->reset();
- auto rows =
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
+ auto rows =
local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;
-
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset(
- nullptr);
- local_state._shared_state->sort_idx++;
+
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
+ local_state._sort_idx++;
}
return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h
b/be/src/pipeline/exec/partition_sort_source_operator.h
index f7d950838c6..23393a870a7 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -68,6 +68,7 @@ private:
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer;
int64_t _num_rows_returned;
+ int _sort_idx = 0;
};
class PartitionSortSourceOperatorX final : public
OperatorX<PartitionSortSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index c5abc5142cc..c1ed4074772 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -565,14 +565,10 @@ struct HashJoinSharedState : public JoinSharedState {
// maybe share hash table with other fragment instances
std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
std::make_shared<vectorized::HashTableVariants>();
- // for full/right outer join
- vectorized::HashTableIteratorVariants outer_join_pull_visited_iter;
- vectorized::HashTableIteratorVariants probe_row_match_iter;
vectorized::Sizes probe_key_sz;
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
- std::shared_ptr<std::vector<vectorized::Block>> build_blocks =
- std::make_shared<std::vector<vectorized::Block>>();
+ std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
bool probe_ignore_null = false;
};
@@ -626,20 +622,31 @@ public:
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row = nullptr;
- int sort_idx = 0;
};
class PartitionSortDependency final : public WriteDependency {
public:
using SharedState = PartitionSortNodeSharedState;
- PartitionSortDependency(int id) : WriteDependency(id,
"PartitionSortDependency") {}
+ PartitionSortDependency(int id) : WriteDependency(id,
"PartitionSortDependency"), _eos(false) {}
~PartitionSortDependency() override = default;
void* shared_state() override { return (void*)&_partition_sort_state; };
void set_ready_for_write() override {}
void block_writing() override {}
+ [[nodiscard]] Dependency* read_blocked_by() override {
+ if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
+ _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
+ << id();
+ }
+ return _ready_for_read || _eos ? nullptr : this;
+ }
+
+ void set_eos() { _eos = true; }
+
private:
PartitionSortNodeSharedState _partition_sort_state;
+ std::atomic<bool> _eos;
};
class AsyncWriterDependency final : public WriteDependency {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index bf6a68f690a..e20e7c83189 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -37,9 +37,9 @@ using MutableColumns = std::vector<MutableColumnPtr>;
using NullMap = ColumnUInt8::Container;
using ConstNullMapPtr = const NullMap*;
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe {
- ProcessHashTableProbe(HashJoinProbeContext* join_context, int batch_size);
+ ProcessHashTableProbe(Parent* parent, int batch_size);
~ProcessHashTableProbe() = default;
// output build side result column
@@ -78,8 +78,9 @@ struct ProcessHashTableProbe {
void _emplace_element(int8_t block_offset, int32_t block_row, int&
current_offset);
template <typename HashTableType>
- HashTableType::State _init_probe_side(HashTableType& hash_table_ctx,
size_t probe_rows,
- bool with_other_join_conjuncts,
const uint8_t* null_map);
+ typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
+ bool
with_other_join_conjuncts,
+ const uint8_t* null_map);
template <typename Mapped, bool with_other_join_conjuncts>
ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int&
probe_index,
@@ -91,9 +92,9 @@ struct ProcessHashTableProbe {
Status process_data_in_hashtable(HashTableType& hash_table_ctx,
MutableBlock& mutable_block,
Block* output_block, bool* eos);
- vectorized::HashJoinProbeContext* _join_context;
+ Parent* _parent;
const int _batch_size;
- const std::vector<Block>& _build_blocks;
+ std::shared_ptr<std::vector<Block>> _build_blocks;
std::unique_ptr<Arena> _arena;
std::vector<StringRef> _probe_keys;
@@ -118,6 +119,13 @@ struct ProcessHashTableProbe {
int _right_col_len;
int _row_count_from_last_probe;
+ bool _have_other_join_conjunct;
+ bool _is_right_semi_anti;
+ Sizes _probe_key_sz;
+ std::vector<bool>* _left_output_slot_flags;
+ std::vector<bool>* _right_output_slot_flags;
+ bool* _has_null_in_build_side;
+
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _search_hashtable_timer;
RuntimeProfile::Counter* _build_side_output_timer;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 54ca59b6eae..16ef76d02c5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -18,6 +18,7 @@
#pragma once
#include "common/status.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
#include "process_hash_table_probe.h"
#include "runtime/thread_context.h" // IWYU pragma: keep
#include "util/simd/bits.h"
@@ -27,32 +28,35 @@
namespace doris::vectorized {
-template <int JoinOpType>
-ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeContext*
join_context,
- int batch_size)
- : _join_context(join_context),
+template <int JoinOpType, typename Parent>
+ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent*
parent, int batch_size)
+ : _parent(parent),
_batch_size(batch_size),
- _build_blocks(*join_context->_build_blocks),
- _tuple_is_null_left_flags(
- join_context->_is_outer_join
- ? &(reinterpret_cast<ColumnUInt8&>(
-
**join_context->_tuple_is_null_left_flag_column)
- .get_data())
- : nullptr),
- _tuple_is_null_right_flags(
- join_context->_is_outer_join
- ? &(reinterpret_cast<ColumnUInt8&>(
-
**join_context->_tuple_is_null_right_flag_column)
- .get_data())
- : nullptr),
- _rows_returned_counter(join_context->_rows_returned_counter),
- _search_hashtable_timer(join_context->_search_hashtable_timer),
- _build_side_output_timer(join_context->_build_side_output_timer),
- _probe_side_output_timer(join_context->_probe_side_output_timer),
-
_probe_process_hashtable_timer(join_context->_probe_process_hashtable_timer) {}
-
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
+ _build_blocks(parent->build_blocks()),
+ _tuple_is_null_left_flags(parent->is_outer_join()
+ ? &(reinterpret_cast<ColumnUInt8&>(
+
*parent->_tuple_is_null_left_flag_column)
+ .get_data())
+ : nullptr),
+ _tuple_is_null_right_flags(parent->is_outer_join()
+ ?
&(reinterpret_cast<ColumnUInt8&>(
+
*parent->_tuple_is_null_right_flag_column)
+ .get_data())
+ : nullptr),
+ _have_other_join_conjunct(parent->have_other_join_conjunct()),
+ _is_right_semi_anti(parent->is_right_semi_anti()),
+ _probe_key_sz(parent->probe_key_sz()),
+ _left_output_slot_flags(parent->left_output_slot_flags()),
+ _right_output_slot_flags(parent->right_output_slot_flags()),
+ _has_null_in_build_side(parent->has_null_in_build_side()),
+ _rows_returned_counter(parent->_rows_returned_counter),
+ _search_hashtable_timer(parent->_search_hashtable_timer),
+ _build_side_output_timer(parent->_build_side_output_timer),
+ _probe_side_output_timer(parent->_probe_side_output_timer),
+
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {}
+
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int
size,
bool have_other_join_conjunct) {
SCOPED_TIMER(_build_side_output_timer);
@@ -66,9 +70,9 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN;
if (!is_semi_anti_join || have_other_join_conjunct) {
- if (_build_blocks.size() == 1) {
+ if (_build_blocks->size() == 1) {
for (int i = 0; i < _right_col_len; i++) {
- auto& column = *_build_blocks[0].get_by_position(i).column;
+ auto& column = *(*_build_blocks)[0].get_by_position(i).column;
if (output_slot_flags[i]) {
mcol[i + _right_col_idx]->insert_indices_from(column,
_build_block_rows.data(),
_build_block_rows.data() + size);
@@ -86,7 +90,7 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
assert_cast<ColumnNullable*>(mcol[i +
_right_col_idx].get())
->insert_default();
} else {
- auto& column =
*_build_blocks[_build_block_offsets[j]]
+ auto& column =
*(*_build_blocks)[_build_block_offsets[j]]
.get_by_position(i)
.column;
mcol[i + _right_col_idx]->insert_from(column,
_build_block_rows[j]);
@@ -101,7 +105,7 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
// just insert default value
mcol[i + _right_col_idx]->insert_default();
} else {
- auto& column =
*_build_blocks[_build_block_offsets[j]]
+ auto& column =
*(*_build_blocks)[_build_block_offsets[j]]
.get_by_position(i)
.column;
mcol[i + _right_col_idx]->insert_from(column,
_build_block_rows[j]);
@@ -125,13 +129,13 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
}
}
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::probe_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int
size,
int last_probe_index, size_t probe_size, bool all_match_one,
bool have_other_join_conjunct) {
SCOPED_TIMER(_probe_side_output_timer);
- auto& probe_block = *_join_context->_probe_block;
+ auto& probe_block = _parent->_probe_block;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
@@ -152,15 +156,15 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
}
}
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
template <typename HashTableType>
-HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side(
+typename HashTableType::State ProcessHashTableProbe<JoinOpType,
Parent>::_init_probe_side(
HashTableType& hash_table_ctx, size_t probe_rows, bool
with_other_join_conjuncts,
const uint8_t* null_map) {
- _right_col_idx = _join_context->_is_right_semi_anti &&
!with_other_join_conjuncts
+ _right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts
? 0
- : _join_context->_left_table_data_types->size();
- _right_col_len = _join_context->_right_table_data_types->size();
+ : _parent->left_table_data_types().size();
+ _right_col_len = _parent->right_table_data_types().size();
_row_count_from_last_probe = 0;
_build_block_rows.clear();
@@ -177,24 +181,20 @@ HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
_build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
_build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
- if (!*_join_context->_ready_probe) {
- *_join_context->_ready_probe = true;
+ if (!_parent->_ready_probe) {
+ _parent->_ready_probe = true;
hash_table_ctx.reset();
- hash_table_ctx.init_serialized_keys(*_join_context->_probe_columns,
- _join_context->_probe_key_sz,
probe_rows, null_map);
+ hash_table_ctx.init_serialized_keys(_parent->_probe_columns,
_probe_key_sz, probe_rows,
+ null_map);
}
- return typename HashTableType::State(*_join_context->_probe_columns,
- _join_context->_probe_key_sz);
+ return typename HashTableType::State(_parent->_probe_columns,
_probe_key_sz);
}
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
template <typename Mapped, bool with_other_join_conjuncts>
-ForwardIterator<Mapped>&
ProcessHashTableProbe<JoinOpType>::_probe_row_match(int& current_offset,
-
int& probe_index,
-
size_t& probe_size,
-
bool& all_match_one) {
- auto& probe_row_match_iter =
-
std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter);
+ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType,
Parent>::_probe_row_match(
+ int& current_offset, int& probe_index, size_t& probe_size, bool&
all_match_one) {
+ auto& probe_row_match_iter =
std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter);
if (!probe_row_match_iter.ok()) {
return probe_row_match_iter;
}
@@ -219,22 +219,24 @@ ForwardIterator<Mapped>&
ProcessHashTableProbe<JoinOpType>::_probe_row_match(int
return probe_row_match_iter;
}
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::_emplace_element(int8_t block_offset,
int32_t block_row,
- int& current_offset) {
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t
block_offset,
+ int32_t
block_row,
+ int&
current_offset) {
_build_block_offsets.emplace_back(block_offset);
_build_block_rows.emplace_back(block_row);
current_offset++;
}
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType,
bool with_other_conjuncts, bool is_mark_join>
-Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType&
hash_table_ctx,
- ConstNullMapPtr null_map,
- MutableBlock&
mutable_block,
- Block* output_block,
size_t probe_rows) {
- auto& probe_index = *_join_context->_probe_index;
+Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType&
hash_table_ctx,
+ ConstNullMapPtr
null_map,
+ MutableBlock&
mutable_block,
+ Block*
output_block,
+ size_t
probe_rows) {
+ auto& probe_index = _parent->_probe_index;
using KeyGetter = typename HashTableType::State;
using Mapped = typename HashTableType::Mapped;
@@ -318,9 +320,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
(JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^
find_result.is_found();
if constexpr (is_mark_join) {
++current_offset;
- bool null_result =
- (need_null_map_for_probe &&
(*null_map)[probe_index]) ||
- (!need_go_ahead &&
_join_context->_has_null_value_in_build_side);
+ bool null_result = (need_null_map_for_probe &&
(*null_map)[probe_index]) ||
+ (!need_go_ahead &&
*_has_null_in_build_side);
if (null_result) {
mark_column->insert_null();
} else {
@@ -401,14 +402,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
probe_size = probe_index - last_probe_index +
probe_row_match_iter.ok();
}
- build_side_output_column(mcol, *_join_context->_right_output_slot_flags,
current_offset,
- with_other_conjuncts);
+ build_side_output_column(mcol, *_right_output_slot_flags, current_offset,
with_other_conjuncts);
if constexpr (with_other_conjuncts || (JoinOpType !=
TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType !=
TJoinOp::RIGHT_ANTI_JOIN)) {
RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
- mcol, *_join_context->_left_output_slot_flags, current_offset,
last_probe_index,
- probe_size, all_match_one, with_other_conjuncts));
+ mcol, *_left_output_slot_flags, current_offset,
last_probe_index, probe_size,
+ all_match_one, with_other_conjuncts));
}
output_block->swap(mutable_block.to_block());
@@ -421,8 +421,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return Status::OK();
}
-template <int JoinOpType>
-Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
+template <int JoinOpType, typename Parent>
+Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts(
Block* output_block, bool is_mark_join, int
multi_matched_output_row_count,
bool is_the_last_sub_block) {
// dispose the other join conjunct exec
@@ -431,14 +431,14 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
return Status::OK();
}
- SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer);
+ SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
int orig_columns = output_block->columns();
IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
- RETURN_IF_ERROR(VExprContext::execute_conjuncts(
- *_join_context->_other_join_conjuncts, nullptr, output_block,
- &other_conjunct_filter, &can_be_filter_all));
+
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts,
nullptr,
+ output_block,
&other_conjunct_filter,
+ &can_be_filter_all));
}
auto filter_column = ColumnUInt8::create();
@@ -465,7 +465,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
null_map_data, filter_map,
output_block);
// This is the last sub block of splitted block, and no
equal-conjuncts-matched tuple
// is output in all sub blocks, need to output a tuple for this
probe row
- if (is_the_last_sub_block &&
!*_join_context->_is_any_probe_match_row_output) {
+ if (is_the_last_sub_block &&
!_parent->_is_any_probe_match_row_output) {
filter_map[0] = true;
null_map_data[0] = true;
}
@@ -514,7 +514,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited equal-conjuncts-matched
tuples of the current probe row
if (multi_matched_output_row_count > 0) {
- *_join_context->_is_any_probe_match_row_output = false;
+ _parent->_is_any_probe_match_row_output = false;
_process_splited_equal_matched_tuples(row_count -
multi_matched_output_row_count,
multi_matched_output_row_count, filter_column_ptr,
null_map_data, filter_map,
output_block);
@@ -534,7 +534,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted
into multiple blocks
if (_row_count_from_last_probe > 0) {
- if (*_join_context->_is_any_probe_match_row_output) {
+ if (_parent->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < _row_count_from_last_probe;
++row_idx) {
@@ -563,14 +563,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
if (multi_matched_output_row_count > 0) {
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
- *_join_context->_is_any_probe_match_row_output =
filter_map[row_count - 1];
- } else if (_row_count_from_last_probe > 0 &&
- !*_join_context->_is_any_probe_match_row_output) {
+ _parent->_is_any_probe_match_row_output = filter_map[row_count -
1];
+ } else if (_row_count_from_last_probe > 0 &&
!_parent->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that are
splitted into multiple blocks,
// and no matched tuple has been output in all previous run.
// If a tuple is output in this run, all the following mathced
tuples should be ignored
if (filter_map[_row_count_from_last_probe - 1]) {
- *_join_context->_is_any_probe_match_row_output = true;
+ _parent->_is_any_probe_match_row_output = true;
}
}
@@ -609,7 +608,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted
into multiple blocks
- if (_row_count_from_last_probe > 0 &&
*_join_context->_is_any_probe_match_row_output) {
+ if (_row_count_from_last_probe > 0 &&
_parent->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < _row_count_from_last_probe;
++row_idx) {
@@ -658,15 +657,15 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
int end_row_idx = 0;
if (_row_count_from_last_probe > 0) {
end_row_idx = row_count - multi_matched_output_row_count;
- if (!*_join_context->_is_any_probe_match_row_output) {
+ if (!_parent->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that
are splitted into multiple blocks,
// and no matched tuple has been output in all previous
run.
// If a tuple is output in this run, all the following
mathced tuples should be ignored
if (filter_map[_row_count_from_last_probe - 1]) {
- *_join_context->_is_any_probe_match_row_output = true;
+ _parent->_is_any_probe_match_row_output = true;
filter_map[_row_count_from_last_probe - 1] = false;
}
- if (is_the_last_sub_block &&
!*_join_context->_is_any_probe_match_row_output) {
+ if (is_the_last_sub_block &&
!_parent->_is_any_probe_match_row_output) {
// This is the last sub block of splitted block, and
no equal-conjuncts-matched tuple
// is output in all sub blocks, output a tuple for
this probe row
filter_map[0] = true;
@@ -676,7 +675,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched
tuples in
// the following sub blocks should be ignored
- *_join_context->_is_any_probe_match_row_output =
filter_map[row_count - 1];
+ _parent->_is_any_probe_match_row_output =
filter_map[row_count - 1];
filter_map[row_count - 1] = false;
}
} else if (multi_matched_output_row_count > 0) {
@@ -684,7 +683,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
- *_join_context->_is_any_probe_match_row_output =
filter_map[row_count - 1];
+ _parent->_is_any_probe_match_row_output = filter_map[row_count
- 1];
filter_map[row_count - 1] = false;
} else {
end_row_idx = row_count;
@@ -744,8 +743,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// and when processing the last sub block, check whether there are any
// equal-conjuncts-matched tuple is output in all sub blocks,
// if not, just pick a tuple and output.
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType,
Parent>::_process_splited_equal_matched_tuples(
int start_row_idx, int row_count, const UInt8* __restrict
other_hit_column,
UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block*
output_block) {
int end_row_idx = start_row_idx + row_count;
@@ -770,16 +769,15 @@ void
ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
*_visited_map[i] |= other_hit;
}
}
- *_join_context->_is_any_probe_match_row_output |=
+ _parent->_is_any_probe_match_row_output |=
simd::contain_byte(filter_map + start_row_idx, row_count, 1);
}
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
template <typename HashTableType>
-Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType&
hash_table_ctx,
-
MutableBlock& mutable_block,
- Block*
output_block,
- bool* eos)
{
+Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
+ HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block*
output_block,
+ bool* eos) {
using Mapped = typename HashTableType::Mapped;
SCOPED_TIMER(_probe_process_hashtable_timer);
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
@@ -787,16 +785,15 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
hash_table_ctx.init_iterator();
auto& mcol = mutable_block.mutable_columns();
- bool right_semi_anti_without_other =
- _join_context->_is_right_semi_anti &&
!_join_context->_have_other_join_conjunct;
+ bool right_semi_anti_without_other = _is_right_semi_anti &&
!_have_other_join_conjunct;
int right_col_idx =
- right_semi_anti_without_other ? 0 :
_join_context->_left_table_data_types->size();
- int right_col_len = _join_context->_right_table_data_types->size();
+ right_semi_anti_without_other ? 0 :
_parent->left_table_data_types().size();
+ int right_col_len = _parent->right_table_data_types().size();
auto& iter = hash_table_ctx.iterator;
auto block_size = 0;
auto& visited_iter =
-
std::get<ForwardIterator<Mapped>>(*_join_context->_outer_join_pull_visited_iter);
+
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
_build_blocks_locs.resize(_batch_size);
auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
_build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset,
row_nums);
@@ -874,13 +871,13 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto insert_build_rows = [&](int8_t offset) {
for (size_t j = 0; j < right_col_len; ++j) {
- auto& column =
*_build_blocks[offset].get_by_position(j).column;
+ auto& column =
*(*_build_blocks)[offset].get_by_position(j).column;
mcol[j + right_col_idx]->insert_indices_from(
column, _build_block_rows.data(),
_build_block_rows.data() + _build_block_rows.size());
}
};
- if (_build_blocks.size() > 1) {
+ if (_build_blocks->size() > 1) {
std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
[](const auto a, const auto b) { return a.first >
b.first; });
auto start = 0, end = 0;
@@ -897,7 +894,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
start = end;
insert_build_rows(offset);
}
- } else if (_build_blocks.size() == 1) {
+ } else if (_build_blocks->size() == 1) {
const auto size = _build_blocks_locs.size();
_build_block_rows.resize(_build_blocks_locs.size());
for (int i = 0; i < size; i++) {
@@ -907,7 +904,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
// just resize the left table column in case with other conjunct to
make block size is not zero
- if (_join_context->_is_right_semi_anti &&
_join_context->_have_other_join_conjunct) {
+ if (_is_right_semi_anti && _have_other_join_conjunct) {
auto target_size = mcol[right_col_idx]->size();
for (int i = 0; i < right_col_idx; ++i) {
mcol[i]->resize(target_size);
@@ -933,13 +930,11 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType>
-Status ProcessHashTableProbe<JoinOpType>::process(HashTableType&
hash_table_ctx,
- ConstNullMapPtr null_map,
- MutableBlock& mutable_block,
Block* output_block,
- size_t probe_rows, bool
is_mark_join,
- bool
have_other_join_conjunct) {
+Status ProcessHashTableProbe<JoinOpType, Parent>::process(
+ HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock&
mutable_block,
+ Block* output_block, size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct) {
Status res;
if constexpr (!std::is_same_v<typename HashTableType::Mapped,
RowRefListWithFlags>) {
if (have_other_join_conjunct) {
@@ -973,74 +968,78 @@ struct ExtractType<T(U)> {
using Type = U;
};
-#define INSTANTIATION(JoinOpType, T)
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<false, false,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<false, true,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<true, false,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<true, true,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
-
\
- template Status
\
-
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
\
- ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock &
mutable_block, \
- Block * output_block, bool* eos)
-
-#define INSTANTIATION_FOR(JoinOpType)
\
- template struct ProcessHashTableProbe<JoinOpType>;
\
-
\
- INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefList>));
\
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true,
RowRefList>)); \
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false,
RowRefList>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true,
RowRefList>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false,
RowRefList>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true,
RowRefList>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false,
RowRefList>)); \
- INSTANTIATION(JoinOpType,
(SerializedHashTableContext<RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlag>));
\
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
- INSTANTIATION(JoinOpType,
(SerializedHashTableContext<RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlags>));
\
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false,
RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false,
RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false,
RowRefListWithFlags>))
+#define INSTANTIATION(JoinOpType, Parent, T)
\
+ template Status
\
+ ProcessHashTableProbe<JoinOpType, Parent>::process<false, false,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
+ MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
+ bool is_mark_join, bool have_other_join_conjunct);
\
+ template Status
\
+ ProcessHashTableProbe<JoinOpType, Parent>::process<false, true,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
+ MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
+ bool is_mark_join, bool have_other_join_conjunct);
\
+ template Status
\
+ ProcessHashTableProbe<JoinOpType, Parent>::process<true, false,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
+ MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
+ bool is_mark_join, bool have_other_join_conjunct);
\
+ template Status
\
+ ProcessHashTableProbe<JoinOpType, Parent>::process<true, true,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
+ MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
+ bool is_mark_join, bool have_other_join_conjunct);
\
+
\
+ template Status ProcessHashTableProbe<JoinOpType,
Parent>::process_data_in_hashtable< \
+ ExtractType<void(T)>::Type>(ExtractType<void(T)>::Type &
hash_table_ctx, \
+ MutableBlock & mutable_block, Block *
output_block, \
+ bool* eos)
+
+#define INSTANTIATION_FOR1(JoinOpType, Parent)
\
+ template struct ProcessHashTableProbe<JoinOpType, Parent>;
\
+
\
+ INSTANTIATION(JoinOpType, Parent,
(SerializedHashTableContext<RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I8HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I16HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I32HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I64HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I128HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I256HashTableContext<RowRefList>));
\
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false,
RowRefList>)); \
+ INSTANTIATION(JoinOpType, Parent,
(SerializedHashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I8HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I16HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I32HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I64HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I128HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I256HashTableContext<RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false,
RowRefListWithFlag>)); \
+ INSTANTIATION(JoinOpType, Parent,
(SerializedHashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I8HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I16HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I32HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I64HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I128HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent,
(I256HashTableContext<RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false,
RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false,
RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true,
RowRefListWithFlags>)); \
+ INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false,
RowRefListWithFlags>))
+
+#define INSTANTIATION_FOR(JoinOpType) \
+ INSTANTIATION_FOR1(JoinOpType, HashJoinNode); \
+ INSTANTIATION_FOR1(JoinOpType, pipeline::HashJoinProbeLocalState)
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index e3647762d46..f48a7e278fe 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -84,120 +84,6 @@ template Status HashJoinNode::_extract_join_column<false>(
std::vector<IColumn const*, std::allocator<IColumn const*>>&,
std::vector<int, std::allocator<int>> const&);
-RuntimeFilterContext::RuntimeFilterContext(HashJoinNode* join_node)
- : _runtime_filter_descs(join_node->_runtime_filter_descs),
- _runtime_filter_slots(join_node->_runtime_filter_slots),
- _build_expr_ctxs(join_node->_build_expr_ctxs),
- _build_rf_cardinality(join_node->_build_rf_cardinality),
- _inserted_rows(join_node->_inserted_rows),
- _push_down_timer(join_node->_push_down_timer),
- _push_compute_timer(join_node->_push_compute_timer) {}
-
-RuntimeFilterContext::RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState*
local_state)
- :
_runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
- _runtime_filter_slots(local_state->_runtime_filter_slots),
- _build_expr_ctxs(local_state->_build_expr_ctxs),
- _build_rf_cardinality(local_state->_build_rf_cardinality),
- _inserted_rows(local_state->_inserted_rows),
- _push_down_timer(local_state->_push_down_timer),
- _push_compute_timer(local_state->_push_compute_timer) {}
-
-HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node)
- : _have_other_join_conjunct(join_node->_have_other_join_conjunct),
- _is_right_semi_anti(join_node->_is_right_semi_anti),
- _is_outer_join(join_node->_is_outer_join),
-
_tuple_is_null_left_flag_column(&join_node->_tuple_is_null_left_flag_column),
-
_tuple_is_null_right_flag_column(&join_node->_tuple_is_null_right_flag_column),
- _other_join_conjuncts(&join_node->_other_join_conjuncts),
- _right_table_data_types(&join_node->_right_table_data_types),
- _left_table_data_types(&join_node->_left_table_data_types),
- _search_hashtable_timer(join_node->_search_hashtable_timer),
- _build_side_output_timer(join_node->_build_side_output_timer),
- _probe_side_output_timer(join_node->_probe_side_output_timer),
-
_probe_process_hashtable_timer(join_node->_probe_process_hashtable_timer),
-
_process_other_join_conjunct_timer(join_node->_process_other_join_conjunct_timer),
- _rows_returned_counter(join_node->_rows_returned_counter),
- _probe_arena_memory_usage(join_node->_probe_arena_memory_usage),
- _arena(join_node->_arena),
-
_outer_join_pull_visited_iter(&join_node->_outer_join_pull_visited_iter),
- _probe_row_match_iter(&join_node->_probe_row_match_iter),
- _build_blocks(join_node->_build_blocks),
- _probe_block(&join_node->_probe_block),
- _probe_columns(&join_node->_probe_columns),
- _probe_index(&join_node->_probe_index),
- _ready_probe(&join_node->_ready_probe),
- _probe_key_sz(join_node->_probe_key_sz),
- _left_output_slot_flags(&join_node->_left_output_slot_flags),
- _right_output_slot_flags(&join_node->_right_output_slot_flags),
-
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output),
- _has_null_value_in_build_side(join_node->_has_null_in_build_side) {}
-
-HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState*
local_state)
- :
_have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct),
- _is_right_semi_anti(local_state->join_probe()->_is_right_semi_anti),
- _is_outer_join(local_state->join_probe()->_is_outer_join),
-
_tuple_is_null_left_flag_column(&local_state->_tuple_is_null_left_flag_column),
-
_tuple_is_null_right_flag_column(&local_state->_tuple_is_null_right_flag_column),
- _other_join_conjuncts(&local_state->_other_join_conjuncts),
-
_right_table_data_types(&local_state->join_probe()->_right_table_data_types),
-
_left_table_data_types(&local_state->join_probe()->_left_table_data_types),
- _search_hashtable_timer(local_state->_search_hashtable_timer),
- _build_side_output_timer(local_state->_build_side_output_timer),
- _probe_side_output_timer(local_state->_probe_side_output_timer),
-
_probe_process_hashtable_timer(local_state->_probe_process_hashtable_timer),
-
_process_other_join_conjunct_timer(local_state->_process_other_join_conjunct_timer),
- _rows_returned_counter(local_state->_rows_returned_counter),
- _probe_arena_memory_usage(local_state->_probe_arena_memory_usage),
- _arena(local_state->_shared_state->arena),
-
_outer_join_pull_visited_iter(&local_state->_shared_state->outer_join_pull_visited_iter),
-
_probe_row_match_iter(&local_state->_shared_state->probe_row_match_iter),
- _build_blocks(local_state->_shared_state->build_blocks),
- _probe_block(&local_state->_probe_block),
- _probe_columns(&local_state->_probe_columns),
- _probe_index(&local_state->_probe_index),
- _ready_probe(&local_state->_ready_probe),
- _probe_key_sz(local_state->_shared_state->probe_key_sz),
-
_left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags),
-
_right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags),
-
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output),
-
_has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side)
{}
-
-HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node)
- : _hash_table_memory_usage(join_node->_hash_table_memory_usage),
- _build_buckets_counter(join_node->_build_buckets_counter),
- _build_collisions_counter(join_node->_build_collisions_counter),
- _build_buckets_fill_counter(join_node->_build_buckets_fill_counter),
- _build_table_insert_timer(join_node->_build_table_insert_timer),
- _build_table_expanse_timer(join_node->_build_table_expanse_timer),
- _build_table_convert_timer(join_node->_build_table_convert_timer),
-
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer),
- _build_arena_memory_usage(join_node->_build_arena_memory_usage),
- _profile(join_node->runtime_profile()),
- _build_key_sz(join_node->_build_key_sz),
- _build_unique(join_node->_build_unique),
- _runtime_filter_descs(join_node->_runtime_filter_descs),
- _inserted_rows(join_node->_inserted_rows),
- _arena(join_node->_arena),
- _build_rf_cardinality(join_node->_build_rf_cardinality) {}
-
-HashJoinBuildContext::HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState*
local_state)
- : _hash_table_memory_usage(local_state->_hash_table_memory_usage),
- _build_buckets_counter(local_state->_build_buckets_counter),
- _build_collisions_counter(local_state->_build_collisions_counter),
-
_build_buckets_fill_counter(local_state->_build_buckets_fill_counter),
- _build_table_insert_timer(local_state->_build_table_insert_timer),
- _build_table_expanse_timer(local_state->_build_table_expanse_timer),
- _build_table_convert_timer(local_state->_build_table_convert_timer),
-
_build_side_compute_hash_timer(local_state->_build_side_compute_hash_timer),
- _build_arena_memory_usage(local_state->_build_arena_memory_usage),
- _profile(local_state->profile()),
- _build_key_sz(local_state->join_build()->_build_key_sz),
- _build_unique(local_state->join_build()->_build_unique),
-
_runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
- _inserted_rows(local_state->_inserted_rows),
- _arena(local_state->_shared_state->arena),
- _build_rf_cardinality(local_state->_build_rf_cardinality) {}
-
HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
: VJoinNodeBase(pool, tnode, descs),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
@@ -896,11 +782,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
- using HashTableCtxType =
std::decay_t<decltype(arg)>;
- RuntimeFilterContext context(this);
-
ProcessRuntimeFilterBuild<HashTableCtxType>
-
runtime_filter_build_process(&context);
- return
runtime_filter_build_process(state, arg);
+ ProcessRuntimeFilterBuild
runtime_filter_build_process;
+ return
runtime_filter_build_process(state, arg, this);
}},
*_hash_table_variants);
if (!ret.ok()) {
@@ -1119,10 +1002,9 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
[&](auto&& arg, auto has_null_value,
auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
- HashJoinBuildContext context(this);
- ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(
- rows, block, raw_ptrs, &context,
state->batch_size(), offset,
- state);
+ ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
+ hash_table_build_process(rows, block,
raw_ptrs, this,
+ state->batch_size(),
offset, state);
return hash_table_build_process
.template run<has_null_value,
short_circuit_for_null_in_build_side>(
arg,
@@ -1277,9 +1159,9 @@ void
HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
std::visit(
[&](auto&& join_op_variants) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
- _probe_context.reset(new HashJoinProbeContext(this));
-
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
- _probe_context.get(), state->batch_size());
+ _process_hashtable_ctx_variants
+ ->emplace<ProcessHashTableProbe<JoinOpType::value,
HashJoinNode>>(
+ this, state->batch_size());
},
_join_op_variants);
}
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 19ddfc4626a..b9264bc1457 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -69,99 +69,52 @@ constexpr size_t CHECK_FRECUENCY = 65536;
struct UInt128;
struct UInt256;
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe;
class HashJoinNode;
-struct RuntimeFilterContext {
- RuntimeFilterContext(HashJoinNode* join_node);
- RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state);
- std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
- std::shared_ptr<VRuntimeFilterSlots>& _runtime_filter_slots;
- VExprContextSPtrs& _build_expr_ctxs;
- size_t& _build_rf_cardinality;
- std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
- RuntimeProfile::Counter* _push_down_timer;
- RuntimeProfile::Counter* _push_compute_timer;
-};
-
-template <class HashTableContext>
struct ProcessRuntimeFilterBuild {
- ProcessRuntimeFilterBuild(RuntimeFilterContext* context) :
_context(context) {}
-
- Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) {
- if (_context->_runtime_filter_descs.empty()) {
+ template <class HashTableContext, typename Parent>
+ Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx,
Parent* parent) {
+ if (parent->runtime_filter_descs().empty()) {
return Status::OK();
}
- _context->_runtime_filter_slots =
std::make_shared<VRuntimeFilterSlots>(
- _context->_build_expr_ctxs, _context->_runtime_filter_descs);
+ parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+ parent->_build_expr_ctxs, parent->runtime_filter_descs());
- RETURN_IF_ERROR(_context->_runtime_filter_slots->init(
- state, hash_table_ctx.hash_table->size(),
_context->_build_rf_cardinality));
+ RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
+ state, hash_table_ctx.hash_table->size(),
parent->_build_rf_cardinality));
- if (!_context->_runtime_filter_slots->empty() &&
!_context->_inserted_rows.empty()) {
+ if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_rows.empty()) {
{
- SCOPED_TIMER(_context->_push_compute_timer);
-
_context->_runtime_filter_slots->insert(_context->_inserted_rows);
+ SCOPED_TIMER(parent->_push_compute_timer);
+ parent->_runtime_filter_slots->insert(parent->_inserted_rows);
}
}
{
- SCOPED_TIMER(_context->_push_down_timer);
- RETURN_IF_ERROR(_context->_runtime_filter_slots->publish());
+ SCOPED_TIMER(parent->_push_down_timer);
+ RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
}
return Status::OK();
}
-
-private:
- RuntimeFilterContext* _context;
-};
-
-struct HashJoinBuildContext {
- HashJoinBuildContext(HashJoinNode* join_node);
- HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state);
-
- void add_hash_buckets_info(const std::string& info) const {
- _profile->add_info_string("HashTableBuckets", info);
- }
- void add_hash_buckets_filled_info(const std::string& info) const {
- _profile->add_info_string("HashTableFilledBuckets", info);
- }
- RuntimeProfile::Counter* _hash_table_memory_usage;
- RuntimeProfile::Counter* _build_buckets_counter;
- RuntimeProfile::Counter* _build_collisions_counter;
- RuntimeProfile::Counter* _build_buckets_fill_counter;
- RuntimeProfile::Counter* _build_table_insert_timer;
- RuntimeProfile::Counter* _build_table_expanse_timer;
- RuntimeProfile::Counter* _build_table_convert_timer;
- RuntimeProfile::Counter* _build_side_compute_hash_timer;
- RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage;
- RuntimeProfile* _profile;
-
- Sizes& _build_key_sz;
- bool& _build_unique;
- std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
- std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
- std::shared_ptr<Arena>& _arena;
- size_t& _build_rf_cardinality;
};
using ProfileCounter = RuntimeProfile::Counter;
-template <class HashTableContext>
+template <class HashTableContext, typename Parent>
struct ProcessHashTableBuild {
ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs&
build_raw_ptrs,
- HashJoinBuildContext* join_context, int batch_size,
uint8_t offset,
- RuntimeState* state)
+ Parent* parent, int batch_size, uint8_t offset,
RuntimeState* state)
: _rows(rows),
_skip_rows(0),
_acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
- _join_context(join_context),
+ _parent(parent),
_batch_size(batch_size),
_offset(offset),
_state(state),
-
_build_side_compute_hash_timer(join_context->_build_side_compute_hash_timer) {}
+
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
template <bool ignore_null, bool short_circuit_for_null>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map,
bool* has_null_key) {
@@ -172,30 +125,30 @@ struct ProcessHashTableBuild {
int64_t bucket_size =
hash_table_ctx.hash_table->get_buffer_size_in_cells();
int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
int64_t bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
- COUNTER_SET(_join_context->_hash_table_memory_usage, bucket_bytes);
- COUNTER_SET(_join_context->_build_buckets_counter, bucket_size);
- COUNTER_SET(_join_context->_build_collisions_counter,
+ COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
+ COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
+ COUNTER_SET(_parent->_build_collisions_counter,
hash_table_ctx.hash_table->get_collisions());
- COUNTER_SET(_join_context->_build_buckets_fill_counter,
filled_bucket_size);
+ COUNTER_SET(_parent->_build_buckets_fill_counter,
filled_bucket_size);
auto hash_table_buckets =
hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
std::string hash_table_buckets_info;
for (auto bucket_count : hash_table_buckets) {
hash_table_buckets_info += std::to_string(bucket_count) + ", ";
}
- _join_context->add_hash_buckets_info(hash_table_buckets_info);
+ _parent->add_hash_buckets_info(hash_table_buckets_info);
auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
hash_table_buckets_info.clear();
for (auto table_size : hash_table_sizes) {
hash_table_buckets_info += std::to_string(table_size) + ", ";
}
-
_join_context->add_hash_buckets_filled_info(hash_table_buckets_info);
+ _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
}};
- KeyGetter key_getter(_build_raw_ptrs, _join_context->_build_key_sz);
+ KeyGetter key_getter(_build_raw_ptrs, _parent->build_key_sz());
- SCOPED_TIMER(_join_context->_build_table_insert_timer);
+ SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->reset_resize_timer();
// only not build_unique, we need expanse hash table before insert data
@@ -204,21 +157,21 @@ struct ProcessHashTableBuild {
// 2. There are many duplicate keys, and the hash table filled bucket
is far less than
// the hash table build bucket, which may waste a lot of memory.
// TODO, use the NDV expansion of the key column in the optimizer
statistics
- if (!_join_context->_build_unique) {
+ if (!_parent->build_unique()) {
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
std::min<int>(_rows,
config::hash_table_pre_expanse_max_rows)));
}
- vector<int>& inserted_rows =
_join_context->_inserted_rows[&_acquired_block];
- bool has_runtime_filter =
!_join_context->_runtime_filter_descs.empty();
+ vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
+ bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
if (has_runtime_filter) {
inserted_rows.reserve(_batch_size);
}
- hash_table_ctx.init_serialized_keys(_build_raw_ptrs,
_join_context->_build_key_sz, _rows,
+ hash_table_ctx.init_serialized_keys(_build_raw_ptrs,
_parent->build_key_sz(), _rows,
null_map ? null_map->data() :
nullptr);
- auto& arena = *(_join_context->_arena);
+ auto& arena = *_parent->arena();
auto old_build_arena_memory = arena.size();
size_t k = 0;
@@ -229,7 +182,7 @@ struct ProcessHashTableBuild {
ctor(key, Mapped {k, _offset});
};
- bool build_unique = _join_context->_build_unique;
+ bool build_unique = _parent->build_unique();
#define EMPLACE_IMPL(stmt) \
for (; k < _rows; ++k) { \
if (k % CHECK_FRECUENCY == 0) { \
@@ -258,21 +211,21 @@ struct ProcessHashTableBuild {
} else if (has_runtime_filter && !build_unique) {
EMPLACE_IMPL(
if (inserted) { inserted_rows.push_back(k); } else {
- mapped.insert({k, _offset}, *(_join_context->_arena));
+ mapped.insert({k, _offset}, *_parent->arena());
inserted_rows.push_back(k);
});
} else if (!has_runtime_filter && build_unique) {
EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
} else {
- EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset},
*(_join_context->_arena)); });
+ EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset},
*_parent->arena()); });
}
- _join_context->_build_rf_cardinality += inserted_rows.size();
+ _parent->_build_rf_cardinality += inserted_rows.size();
- _join_context->_build_arena_memory_usage->add(arena.size() -
old_build_arena_memory);
+ _parent->_build_arena_memory_usage->add(arena.size() -
old_build_arena_memory);
- COUNTER_UPDATE(_join_context->_build_table_expanse_timer,
+ COUNTER_UPDATE(_parent->_build_table_expanse_timer,
hash_table_ctx.hash_table->get_resize_timer_value());
- COUNTER_UPDATE(_join_context->_build_table_convert_timer,
+ COUNTER_UPDATE(_parent->_build_table_convert_timer,
hash_table_ctx.hash_table->get_convert_timer_value());
return Status::OK();
@@ -283,7 +236,7 @@ private:
int _skip_rows;
Block& _acquired_block;
ColumnRawPtrs& _build_raw_ptrs;
- HashJoinBuildContext* _join_context;
+ Parent* _parent;
int _batch_size;
uint8_t _offset;
RuntimeState* _state;
@@ -348,16 +301,16 @@ using HashTableVariants = std::variant<
class VExprContext;
using HashTableCtxVariants =
- std::variant<std::monostate,
ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
- ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
- ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
- ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
- ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
- ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
- ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
- ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
- ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
-
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
+ std::variant<std::monostate,
ProcessHashTableProbe<TJoinOp::INNER_JOIN, HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::CROSS_JOIN, HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN,
HashJoinNode>,
+ ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
HashJoinNode>>;
using HashTableIteratorVariants =
std::variant<std::monostate, ForwardIterator<RowRefList>,
@@ -365,52 +318,6 @@ using HashTableIteratorVariants =
static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
-struct HashJoinProbeContext {
- HashJoinProbeContext(HashJoinNode* join_node);
- HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state);
- bool _have_other_join_conjunct;
- const bool _is_right_semi_anti;
- const bool _is_outer_join;
-
- MutableColumnPtr* _tuple_is_null_left_flag_column;
- MutableColumnPtr* _tuple_is_null_right_flag_column;
-
- // other expr
- VExprContextSPtrs* _other_join_conjuncts;
-
- DataTypes* _right_table_data_types;
- DataTypes* _left_table_data_types;
-
- RuntimeProfile::Counter* _search_hashtable_timer;
- RuntimeProfile::Counter* _build_side_output_timer;
- RuntimeProfile::Counter* _probe_side_output_timer;
- RuntimeProfile::Counter* _probe_process_hashtable_timer;
- RuntimeProfile::Counter* _process_other_join_conjunct_timer;
- RuntimeProfile::Counter* _rows_returned_counter;
- RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage;
-
- std::shared_ptr<Arena> _arena;
-
- // for full/right outer join
- HashTableIteratorVariants* _outer_join_pull_visited_iter;
- HashTableIteratorVariants* _probe_row_match_iter;
-
- std::shared_ptr<std::vector<Block>> _build_blocks;
- Block* _probe_block;
- ColumnRawPtrs* _probe_columns;
- int* _probe_index;
- bool* _ready_probe;
-
- Sizes _probe_key_sz;
-
- std::vector<bool>* _left_output_slot_flags;
- std::vector<bool>* _right_output_slot_flags;
-
- // for cases when a probe row matches more than batch size build rows.
- bool* _is_any_probe_match_row_output;
- bool _has_null_value_in_build_side {};
-};
-
class HashJoinNode final : public VJoinNodeBase {
public:
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
@@ -450,13 +357,27 @@ public:
return _runtime_filter_slots->ready_finish_publish();
}
+ bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
+ bool is_right_semi_anti() const { return _is_right_semi_anti; }
+ bool is_outer_join() const { return _is_outer_join; }
+ std::shared_ptr<std::vector<Block>> build_blocks() const { return
_build_blocks; }
+ Sizes probe_key_sz() const { return _probe_key_sz; }
+ std::vector<bool>* left_output_slot_flags() { return
&_left_output_slot_flags; }
+ std::vector<bool>* right_output_slot_flags() { return
&_right_output_slot_flags; }
+ bool* has_null_in_build_side() { return &_has_null_in_build_side; }
+ DataTypes right_table_data_types() { return _right_table_data_types; }
+ DataTypes left_table_data_types() { return _left_table_data_types; }
+ vectorized::Sizes& build_key_sz() { return _build_key_sz; }
+ bool build_unique() const { return _build_unique; }
+ std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return
_runtime_filter_descs; }
+ std::shared_ptr<vectorized::Arena> arena() { return _arena; }
+
protected:
void _probe_side_open_thread(RuntimeState* state, std::promise<Status>*
status) override;
private:
- friend struct HashJoinProbeContext;
- friend struct HashJoinBuildContext;
- friend struct RuntimeFilterContext;
+ template <int JoinOpType, typename Parent>
+ friend struct ProcessHashTableProbe;
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
@@ -605,13 +526,12 @@ private:
bool* eos, Block* temp_block,
bool check_rows_count = true);
- template <class HashTableContext>
+ template <class HashTableContext, typename Parent>
friend struct ProcessHashTableBuild;
- template <int JoinOpType>
+ template <int JoinOpType, typename Parent>
friend struct ProcessHashTableProbe;
- template <class HashTableContext>
friend struct ProcessRuntimeFilterBuild;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
@@ -620,8 +540,6 @@ private:
std::vector<IRuntimeFilter*> _runtime_filters;
size_t _build_rf_cardinality = 0;
std::atomic_bool _probe_open_finish = false;
-
- std::unique_ptr<HashJoinProbeContext> _probe_context;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 9bb946bc0ef..b3758407f88 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -105,7 +105,7 @@ protected:
TJoinOp::type _join_op;
JoinOpVariants _join_op_variants;
- bool _have_other_join_conjunct;
+ const bool _have_other_join_conjunct;
const bool _match_all_probe; // output all rows coming from the probe
input. Full/Left Join
const bool _match_all_build; // output all rows coming from the build
input. Full/Right Join
bool _build_unique; // build a hash table without duplicated
rows. Left semi/anti Join
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index fae87c3731e..05c0bc609eb 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -54,6 +54,7 @@ struct SharedRuntimeFilterContext {
struct SharedHashTableContext {
SharedHashTableContext()
: hash_table_variants(nullptr),
+ blocks(new std::vector<vectorized::Block>()),
signaled(false),
short_circuit_for_null_in_probe_side(false) {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]