HappenLee commented on code in PR #59591:
URL: https://github.com/apache/doris/pull/59591#discussion_r2791289404


##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -286,15 +294,198 @@ void 
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
              (empty_block &&
               (p._join_op == TJoinOp::INNER_JOIN || p._join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
                p._join_op == TJoinOp::RIGHT_OUTER_JOIN || p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN ||
-               p._join_op == TJoinOp::RIGHT_ANTI_JOIN))) &&
+               p._join_op == TJoinOp::RIGHT_ANTI_JOIN ||
+               p._join_op == TJoinOp::ASOF_LEFT_INNER_JOIN))) &&
             !p._is_mark_join;
 
     //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
     //we could get the result is probe table + null-column(if need output)
     _shared_state->empty_right_table_need_probe_dispose =
             (empty_block && !p._have_other_join_conjunct && !p._is_mark_join) 
&&
             (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN ||
-             p._join_op == TJoinOp::LEFT_ANTI_JOIN);
+             p._join_op == TJoinOp::LEFT_ANTI_JOIN || p._join_op == 
TJoinOp::ASOF_LEFT_OUTER_JOIN);
+}
+
+Status HashJoinBuildSinkLocalState::build_asof_index(vectorized::Block& block) 
{
+    auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+
+    // Only for ASOF JOIN types
+    if (p._join_op != TJoinOp::ASOF_LEFT_INNER_JOIN &&
+        p._join_op != TJoinOp::ASOF_LEFT_OUTER_JOIN) {
+        _shared_state->asof_index_ready = true;
+        return Status::OK();
+    }
+
+    if (block.rows() <= 1) {
+        // Empty or only mock row
+        _shared_state->asof_index_ready = true;
+        return Status::OK();
+    }
+
+    // Get hash table's first and next arrays to traverse buckets
+    uint32_t bucket_size = 0;
+    const uint32_t* first_array = nullptr;
+    const uint32_t* next_array = nullptr;
+    size_t build_rows = 0;
+
+    std::visit(vectorized::Overload {[&](std::monostate&) {},
+                                     [&](auto&& hash_table_ctx) {
+                                         auto* hash_table = 
hash_table_ctx.hash_table.get();
+                                         if (hash_table) {
+                                             bucket_size = 
hash_table->get_bucket_size();
+                                             first_array = 
hash_table->get_first().data();
+                                             next_array = 
hash_table->get_next().data();
+                                             build_rows = hash_table->size();
+                                         }
+                                     }},
+               
_shared_state->hash_table_variant_vector.front()->method_variant);
+
+    if (bucket_size == 0 || !first_array || !next_array) {
+        _shared_state->asof_index_ready = true;
+        return Status::OK();
+    }
+
+    // Set inequality direction from opcode (moved from probe open())
+    _shared_state->asof_inequality_is_greater =
+            (p._asof_opcode == TExprOpcode::GE || p._asof_opcode == 
TExprOpcode::GT);
+    _shared_state->asof_inequality_is_strict =
+            (p._asof_opcode == TExprOpcode::GT || p._asof_opcode == 
TExprOpcode::LT);
+
+    // Compute build ASOF column by executing build-side expression on 
build_block
+    // Expression was prepared against intermediate row desc (probe+build 
layout),
+    // so we create a block with dummy probe columns + actual build columns to 
match
+    if (p._asof_build_side_expr) {
+        vectorized::Block tmp_block;
+        size_t num_build_rows = block.rows();
+        // Add dummy probe columns to match intermediate layout
+        for (size_t i = 0; i < p._asof_num_probe_slots; ++i) {
+            auto dummy_col = vectorized::ColumnNullable::create(
+                    vectorized::ColumnInt8::create(num_build_rows),
+                    vectorized::ColumnUInt8::create(num_build_rows, 1));
+            tmp_block.insert({std::move(dummy_col),
+                              std::make_shared<vectorized::DataTypeNullable>(
+                                      
std::make_shared<vectorized::DataTypeInt8>()),
+                              ""});
+        }
+        // Add actual build columns
+        for (size_t i = 0; i < block.columns(); ++i) {
+            tmp_block.insert(block.get_by_position(i));
+        }
+        int result_col_idx = -1;
+        vectorized::VExprContextSPtr local_expr;
+        RETURN_IF_ERROR(p._asof_build_side_expr->clone(_state, local_expr));
+        auto st = local_expr->execute(&tmp_block, &result_col_idx);
+        if (st.ok() && result_col_idx >= 0 &&
+            result_col_idx < static_cast<int>(tmp_block.columns())) {
+            _shared_state->asof_build_col = 
tmp_block.get_by_position(result_col_idx)
+                                                    
.column->convert_to_full_column_if_const();
+        }
+    }
+
+    // Initialize bucket indices and reverse mapping.
+    // IMPORTANT: rows in the same hash bucket may have different keys (hash 
collisions).
+    // We must sub-group by actual key equality so the ASOF binary search only 
considers
+    // rows with matching equality keys.
+    _shared_state->asof_build_row_to_bucket.resize(build_rows + 1, 0);
+
+    std::visit(vectorized::Overload {
+                       [&](std::monostate&) {},
+                       [&](auto&& hash_table_ctx) {
+                           auto* hash_table = hash_table_ctx.hash_table.get();
+                           if (!hash_table) {
+                               return;
+                           }
+                           const auto* build_keys = 
hash_table->get_build_keys();
+                           uint32_t next_group_id = 0;
+
+                           for (uint32_t bucket = 0; bucket <= bucket_size; 
++bucket) {
+                               uint32_t row_idx = first_array[bucket];
+                               if (row_idx == 0) {
+                                   continue;
+                               }
+
+                               // Collect all rows in this hash bucket
+                               std::vector<uint32_t> bucket_rows;
+                               while (row_idx != 0 && row_idx <= build_rows) {

Review Comment:
   why the `row_idx need <= build_rows?`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to