Mryange commented on code in PR #59591:
URL: https://github.com/apache/doris/pull/59591#discussion_r2781115030
##########
be/src/pipeline/exec/join/process_hash_table_probe_impl.h:
##########
@@ -208,6 +223,342 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
return typename HashTableType::State(_parent->_probe_columns);
}
+// ASOF JOIN optimized: O(log K) binary search per probe row
+// Key design: execute ASOF expression directly to get column data, no column
name matching
+template <int JoinOpType>
+template <typename HashTableType>
+uint32_t ProcessHashTableProbe<JoinOpType>::_find_batch_asof_optimized(
+ HashTableType& hash_table_ctx, const uint8_t* null_map, uint32_t
probe_rows) {
+ auto* shared_state = _parent->_shared_state;
+ constexpr bool is_outer_join = (JoinOpType ==
TJoinOp::ASOF_LEFT_OUTER_JOIN ||
+ JoinOpType ==
TJoinOp::ASOF_RIGHT_OUTER_JOIN);
+ auto& probe_index = _parent->_probe_index;
+
+ // Empty build table handling
+ if (!shared_state->asof_index_ready) {
+ if constexpr (is_outer_join) {
+ uint32_t matched_cnt = 0;
+ for (; probe_index < probe_rows && matched_cnt < _batch_size;
++probe_index) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ return matched_cnt;
+ }
+ probe_index = probe_rows;
+ return 0;
+ }
+
+ // Get ASOF expression
+ auto& conjuncts = _parent->_other_join_conjuncts;
+ if (conjuncts.empty() || !conjuncts[0] || !conjuncts[0]->root() ||
+ conjuncts[0]->root()->get_num_children() != 2) {
+ probe_index = probe_rows;
+ return 0;
+ }
+
+ // Execute probe side expression to get probe ASOF column
+ // Use a temporary block for expression execution to avoid modifying
original probe_block
+ int probe_col_idx = -1;
+ auto& probe_block = _parent->_probe_block;
+ vectorized::ColumnPtr probe_col_ptr;
+
+ auto left_child = conjuncts[0]->root()->get_child(0);
+ if (left_child->is_slot_ref()) {
+ // Simple column reference: directly get from probe_block
+ auto* slot_ref = static_cast<vectorized::VSlotRef*>(left_child.get());
+ int col_id = slot_ref->column_id();
+ if (col_id >= 0 && col_id < static_cast<int>(probe_block.columns())) {
+ probe_col_ptr =
+
probe_block.get_by_position(col_id).column->convert_to_full_column_if_const();
+ }
+ } else {
+ // Expression: execute on a cloned block to avoid modifying original
+ vectorized::Block tmp_probe_block(probe_block);
+ auto status = left_child->execute(conjuncts[0].get(),
&tmp_probe_block, &probe_col_idx);
+ if (status.ok() && probe_col_idx >= 0 &&
+ probe_col_idx < static_cast<int>(tmp_probe_block.columns())) {
+ probe_col_ptr = tmp_probe_block.get_by_position(probe_col_idx)
+ .column->convert_to_full_column_if_const();
+ }
+ }
+
+ if (!probe_col_ptr) {
+ probe_index = probe_rows;
+ return 0;
+ }
+ // Remove nullable wrapper for comparison - keep original for null check
+ vectorized::ColumnPtr probe_col_for_compare = probe_col_ptr;
+ if (probe_col_ptr->is_nullable()) {
+ probe_col_for_compare = assert_cast<const
vectorized::ColumnNullable*>(probe_col_ptr.get())
+ ->get_nested_column_ptr();
+ }
+ const auto* probe_col = probe_col_for_compare.get();
+
+ // Get build ASOF column by matching column name from expression label
+ auto* build_block = shared_state->build_block.get();
+ if (!build_block || build_block->rows() <= 1) {
+ if constexpr (is_outer_join) {
+ uint32_t matched_cnt = 0;
+ for (; probe_index < probe_rows && matched_cnt < _batch_size;
++probe_index) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ return matched_cnt;
+ }
+ probe_index = probe_rows;
+ return 0;
+ }
+
+ // One-time init: compute build ASOF column and sort bucket entries.
+ // std::call_once ensures thread-safe init; after first call, zero
overhead (atomic flag check).
+ std::call_once(shared_state->asof_init_once, [&]() {
+ auto right_child = conjuncts[0]->root()->get_child(1);
+ if (right_child->is_slot_ref()) {
+ // Simple column reference: use column_id offset
+ auto* slot_ref =
static_cast<vectorized::VSlotRef*>(right_child.get());
+ int col_id_in_intermediate = slot_ref->column_id();
+ int build_col_idx = col_id_in_intermediate -
static_cast<int>(_right_col_idx);
+ if (build_col_idx >= 0 && build_col_idx <
static_cast<int>(build_block->columns())) {
+ shared_state->asof_build_col =
build_block->get_by_position(build_col_idx)
+
.column->convert_to_full_column_if_const();
+ }
+ } else {
+ // Expression: need to create a block with correct column layout
+ vectorized::Block tmp_block;
+ size_t build_rows = build_block->rows();
+ for (size_t i = 0; i < _right_col_idx; ++i) {
+ auto dummy_col = vectorized::ColumnNullable::create(
+ vectorized::ColumnInt8::create(build_rows),
+ vectorized::ColumnUInt8::create(build_rows, 1));
+ tmp_block.insert({std::move(dummy_col),
+
std::make_shared<vectorized::DataTypeNullable>(
+
std::make_shared<vectorized::DataTypeInt8>()),
+ ""});
+ }
+ for (size_t i = 0; i < build_block->columns(); ++i) {
+ tmp_block.insert(build_block->get_by_position(i));
+ }
+ int tmp_col_idx = -1;
+ auto st = right_child->execute(conjuncts[0].get(), &tmp_block,
&tmp_col_idx);
+ if (st.ok() && tmp_col_idx >= 0 &&
+ tmp_col_idx < static_cast<int>(tmp_block.columns())) {
+ shared_state->asof_build_col =
tmp_block.get_by_position(tmp_col_idx)
+
.column->convert_to_full_column_if_const();
+ }
+ }
+ if (shared_state->asof_build_col) {
+ // Remove nullable wrapper for sorting comparator
+ vectorized::ColumnPtr build_col_for_sort =
shared_state->asof_build_col;
+ if (shared_state->asof_build_col->is_nullable()) {
+ build_col_for_sort = assert_cast<const
vectorized::ColumnNullable*>(
+
shared_state->asof_build_col.get())
+ ->get_nested_column_ptr();
+ }
+ const auto* sort_col = build_col_for_sort.get();
+ for (auto& bucket : shared_state->asof_bucket_indices) {
+ if (bucket.valid && !bucket.entries.empty()) {
+ // Filter out entries where ASOF column is NULL
+ if (shared_state->asof_build_col->is_nullable()) {
+ const auto* nullable_col = assert_cast<const
vectorized::ColumnNullable*>(
+ shared_state->asof_build_col.get());
+ bucket.entries.erase(
+ std::remove_if(bucket.entries.begin(),
bucket.entries.end(),
+ [nullable_col](const auto&
entry) {
+ return
nullable_col->is_null_at(entry.row_index);
+ }),
+ bucket.entries.end());
+ }
+ // Sort remaining entries
+ if (bucket.entries.size() > 1) {
+ pdqsort(bucket.entries.begin(), bucket.entries.end(),
+ [sort_col](const auto& a, const auto& b) {
+ return sort_col->compare_at(a.row_index,
b.row_index, *sort_col,
+ 1) < 0;
+ });
+ }
+ bucket.valid = !bucket.entries.empty();
+ }
+ }
+ }
+ });
+ if (!shared_state->asof_build_col) {
+ probe_index = probe_rows;
+ return 0;
+ }
+ // Remove nullable wrapper for comparison (read-only after init, no lock
needed)
+ vectorized::ColumnPtr build_col_for_compare = shared_state->asof_build_col;
+ if (shared_state->asof_build_col->is_nullable()) {
+ build_col_for_compare =
+ assert_cast<const
vectorized::ColumnNullable*>(shared_state->asof_build_col.get())
+ ->get_nested_column_ptr();
+ }
+ const auto* build_col = build_col_for_compare.get();
+
+ bool is_greater = shared_state->asof_inequality_is_greater;
+ bool is_strict = shared_state->asof_inequality_is_strict;
+ uint32_t matched_cnt = 0;
+
+ // RIGHT JOIN: append qualifying probe entries to per-task local cache
(O(1) per row).
+ // No comparison or iteration of build entries needed during probe phase.
+ // The merge phase (finish_probing) sorts probe entries per bucket and
binary-searches
+ // per build row for O(log P) matching.
+ constexpr bool is_right_join = (JoinOpType ==
TJoinOp::ASOF_RIGHT_INNER_JOIN ||
+ JoinOpType ==
TJoinOp::ASOF_RIGHT_OUTER_JOIN);
+ if constexpr (is_right_join) {
+ // Lazily init per-task local cache (no lock needed - it's
task-private)
+ if (!_asof_right_local_cache) {
+ _asof_right_local_cache = std::make_unique<AsofRightLocalCache>();
+ _asof_right_local_cache->probe_asof_values =
probe_col->clone_empty();
+ vectorized::Block cache_schema;
+ size_t cache_cols =
+ std::min((size_t)probe_block.columns(),
(size_t)_left_output_slot_flags.size());
+ for (size_t i = 0; i < cache_cols; ++i) {
+
cache_schema.insert(probe_block.get_by_position(i).clone_empty());
+ }
+ _asof_right_local_cache->probe_cache =
+
vectorized::MutableBlock::create_unique(std::move(cache_schema));
+ }
+
+ auto& local_bucket_ids = _asof_right_local_cache->bucket_ids;
+ auto& local_probe_asof = _asof_right_local_cache->probe_asof_values;
+ auto& local_probe_cache = _asof_right_local_cache->probe_cache;
+
+ for (; probe_index < probe_rows; ++probe_index) {
+ if ((null_map && null_map[probe_index]) ||
probe_col_ptr->is_null_at(probe_index)) {
+ continue;
+ }
+
+ uint32_t first_build_row = hash_table_ctx.bucket_nums[probe_index];
+ if (first_build_row == 0 ||
+ first_build_row >=
shared_state->asof_build_row_to_bucket.size()) {
+ continue;
+ }
+
+ uint32_t bucket_id =
shared_state->asof_build_row_to_bucket[first_build_row];
+ if (bucket_id >= shared_state->asof_bucket_indices.size()) {
+ continue;
+ }
+
+ const auto& bucket = shared_state->asof_bucket_indices[bucket_id];
+ if (!bucket.valid || bucket.entries.empty()) {
+ continue;
+ }
+
+ // Just record this probe entry — O(1), no comparison/iteration
needed.
+ // The merge phase will sort these per bucket and binary search
per build row.
+ local_bucket_ids.push_back(bucket_id);
+ local_probe_asof->insert_from(*probe_col, probe_index);
+ for (size_t col = 0; col < local_probe_cache->columns(); ++col) {
+ local_probe_cache->get_column_by_position(col)->insert_from(
+ *probe_block.get_by_position(col).column, probe_index);
+ }
+ }
+ return 0;
+ }
+
+ // LEFT JOIN: for each probe row, find best build match
+ for (; probe_index < probe_rows && matched_cnt < _batch_size;
++probe_index) {
+ // Skip NULL probe keys - use original probe_col_ptr for null check
+ if ((null_map && null_map[probe_index]) ||
probe_col_ptr->is_null_at(probe_index)) {
+ if constexpr (is_outer_join) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ continue;
+ }
+
+ // Get bucket for this probe row
+ uint32_t first_build_row = hash_table_ctx.bucket_nums[probe_index];
+
+ if (first_build_row == 0 ||
+ first_build_row >= shared_state->asof_build_row_to_bucket.size()) {
+ if constexpr (is_outer_join) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ continue;
+ }
+
+ uint32_t bucket_id =
shared_state->asof_build_row_to_bucket[first_build_row];
+ if (bucket_id >= shared_state->asof_bucket_indices.size()) {
+ if constexpr (is_outer_join) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ continue;
+ }
+
+ const auto& bucket = shared_state->asof_bucket_indices[bucket_id];
+ if (!bucket.valid || bucket.entries.empty()) {
+ if constexpr (is_outer_join) {
+ _probe_indexs.get_element(matched_cnt) = probe_index;
+ _build_indexs.get_element(matched_cnt) = 0;
+ matched_cnt++;
+ }
+ continue;
+ }
+ // Binary search in sorted entries
+ const auto& entries = bucket.entries;
+ uint32_t best_build_row = 0;
+
+ if (is_greater) {
+ // probe >= build: find largest build <= probe
+ ssize_t lo = 0, hi = static_cast<ssize_t>(entries.size()) - 1;
+ ssize_t result = -1;
+ while (lo <= hi) {
+ ssize_t mid = lo + (hi - lo) / 2;
+ int cmp = probe_col->compare_at(probe_index,
entries[mid].row_index, *build_col, 1);
Review Comment:
这些二分的代码可以写成column的成员函数。
找到第一个大于/大于等于/小于/小于等于,把这些代码搬到 COWHelper 可能就行了。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]