HappenLee commented on code in PR #59591:
URL: https://github.com/apache/doris/pull/59591#discussion_r2791289404
##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -286,15 +294,198 @@ void
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
(empty_block &&
(p._join_op == TJoinOp::INNER_JOIN || p._join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
p._join_op == TJoinOp::RIGHT_OUTER_JOIN || p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN ||
- p._join_op == TJoinOp::RIGHT_ANTI_JOIN))) &&
+ p._join_op == TJoinOp::RIGHT_ANTI_JOIN ||
+ p._join_op == TJoinOp::ASOF_LEFT_INNER_JOIN))) &&
!p._is_mark_join;
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(empty_block && !p._have_other_join_conjunct && !p._is_mark_join)
&&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN ||
- p._join_op == TJoinOp::LEFT_ANTI_JOIN);
+ p._join_op == TJoinOp::LEFT_ANTI_JOIN || p._join_op ==
TJoinOp::ASOF_LEFT_OUTER_JOIN);
+}
+
+Status HashJoinBuildSinkLocalState::build_asof_index(vectorized::Block& block)
{
+ auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+
+ // Only for ASOF JOIN types
+ if (p._join_op != TJoinOp::ASOF_LEFT_INNER_JOIN &&
+ p._join_op != TJoinOp::ASOF_LEFT_OUTER_JOIN) {
+ _shared_state->asof_index_ready = true;
+ return Status::OK();
+ }
+
+ if (block.rows() <= 1) {
+ // Empty or only mock row
+ _shared_state->asof_index_ready = true;
+ return Status::OK();
+ }
+
+ // Get hash table's first and next arrays to traverse buckets
+ uint32_t bucket_size = 0;
+ const uint32_t* first_array = nullptr;
+ const uint32_t* next_array = nullptr;
+ size_t build_rows = 0;
+
+ std::visit(vectorized::Overload {[&](std::monostate&) {},
+ [&](auto&& hash_table_ctx) {
+ auto* hash_table =
hash_table_ctx.hash_table.get();
+ if (hash_table) {
+ bucket_size =
hash_table->get_bucket_size();
+ first_array =
hash_table->get_first().data();
+ next_array =
hash_table->get_next().data();
+ build_rows = hash_table->size();
+ }
+ }},
+
_shared_state->hash_table_variant_vector.front()->method_variant);
+
+ if (bucket_size == 0 || !first_array || !next_array) {
+ _shared_state->asof_index_ready = true;
+ return Status::OK();
+ }
+
+ // Set inequality direction from opcode (moved from probe open())
+ _shared_state->asof_inequality_is_greater =
+ (p._asof_opcode == TExprOpcode::GE || p._asof_opcode ==
TExprOpcode::GT);
+ _shared_state->asof_inequality_is_strict =
+ (p._asof_opcode == TExprOpcode::GT || p._asof_opcode ==
TExprOpcode::LT);
+
+ // Compute build ASOF column by executing build-side expression on
build_block
+ // Expression was prepared against intermediate row desc (probe+build
layout),
+ // so we create a block with dummy probe columns + actual build columns to
match
+ if (p._asof_build_side_expr) {
+ vectorized::Block tmp_block;
+ size_t num_build_rows = block.rows();
+ // Add dummy probe columns to match intermediate layout
+ for (size_t i = 0; i < p._asof_num_probe_slots; ++i) {
+ auto dummy_col = vectorized::ColumnNullable::create(
+ vectorized::ColumnInt8::create(num_build_rows),
+ vectorized::ColumnUInt8::create(num_build_rows, 1));
+ tmp_block.insert({std::move(dummy_col),
+ std::make_shared<vectorized::DataTypeNullable>(
+
std::make_shared<vectorized::DataTypeInt8>()),
+ ""});
+ }
+ // Add actual build columns
+ for (size_t i = 0; i < block.columns(); ++i) {
+ tmp_block.insert(block.get_by_position(i));
+ }
+ int result_col_idx = -1;
+ vectorized::VExprContextSPtr local_expr;
+ RETURN_IF_ERROR(p._asof_build_side_expr->clone(_state, local_expr));
+ auto st = local_expr->execute(&tmp_block, &result_col_idx);
+ if (st.ok() && result_col_idx >= 0 &&
+ result_col_idx < static_cast<int>(tmp_block.columns())) {
+ _shared_state->asof_build_col =
tmp_block.get_by_position(result_col_idx)
+
.column->convert_to_full_column_if_const();
+ }
+ }
+
+ // Initialize bucket indices and reverse mapping.
+ // IMPORTANT: rows in the same hash bucket may have different keys (hash
collisions).
+ // We must sub-group by actual key equality so the ASOF binary search only
considers
+ // rows with matching equality keys.
+ _shared_state->asof_build_row_to_bucket.resize(build_rows + 1, 0);
+
+ std::visit(vectorized::Overload {
+ [&](std::monostate&) {},
+ [&](auto&& hash_table_ctx) {
+ auto* hash_table = hash_table_ctx.hash_table.get();
+ if (!hash_table) {
+ return;
+ }
+ const auto* build_keys =
hash_table->get_build_keys();
+ uint32_t next_group_id = 0;
+
+ for (uint32_t bucket = 0; bucket <= bucket_size;
++bucket) {
+ uint32_t row_idx = first_array[bucket];
+ if (row_idx == 0) {
+ continue;
+ }
+
+ // Collect all rows in this hash bucket
+ std::vector<uint32_t> bucket_rows;
+ while (row_idx != 0 && row_idx <= build_rows) {
Review Comment:
why the `row_idx need <= build_rows?`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]