HappenLee commented on code in PR #60179:
URL: https://github.com/apache/doris/pull/60179#discussion_r2762604508


##########
be/src/pipeline/exec/nested_loop_join_probe_operator.cpp:
##########
@@ -88,58 +89,249 @@ void 
NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block*
 void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() {
     // TODO: need a vector of left block to register the 
_probe_row_visited_flags
     _current_build_pos = 0;
-    _left_block_pos++;
+    _probe_block_pos++;
 }
 
-template <typename JoinOpType, bool set_build_side_flag, bool 
set_probe_side_flag>
-Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* 
state,
-                                                               JoinOpType& 
join_op_variants) {
+// process_probe_block and process_build_block are similar.
+// One generates build rows based on a probe row; the other generates
+// probe rows based on a build row. Their implementation approach and
+// code structure are similar.
+
+void process_probe_block(int64_t probe_block_pos, vectorized::Block& block,
+                         const vectorized::Block& probe_block, size_t 
probe_side_columns,
+                         const vectorized::Block& build_block, size_t 
build_side_columns) {
+    auto dst_columns = block.mutate_columns();
+    const size_t max_added_rows = build_block.rows();
+    for (size_t i = 0; i < probe_side_columns; ++i) {
+        const vectorized::ColumnWithTypeAndName& src_column = 
probe_block.get_by_position(i);
+        if (!src_column.column->is_nullable() && 
dst_columns[i]->is_nullable()) {
+            auto origin_sz = dst_columns[i]->size();
+            assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
+                    ->get_nested_column_ptr()
+                    ->insert_many_from(*src_column.column, probe_block_pos, 
max_added_rows);
+            assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .resize_fill(origin_sz + max_added_rows, 0);
+        } else {
+            // TODO: for cross join, maybe could insert one row, and wrap for 
a const column
+            dst_columns[i]->insert_many_from(*src_column.column, 
probe_block_pos, max_added_rows);
+        }
+    }
+    for (size_t i = 0; i < build_side_columns; ++i) {
+        const vectorized::ColumnWithTypeAndName& src_column = 
build_block.get_by_position(i);
+        if (!src_column.column->is_nullable() &&
+            dst_columns[probe_side_columns + i]->is_nullable()) {
+            auto origin_sz = dst_columns[probe_side_columns + i]->size();
+            
assert_cast<vectorized::ColumnNullable*>(dst_columns[probe_side_columns + 
i].get())
+                    ->get_nested_column_ptr()
+                    ->insert_range_from(*src_column.column.get(), 0, 
max_added_rows);
+            
assert_cast<vectorized::ColumnNullable*>(dst_columns[probe_side_columns + 
i].get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .resize_fill(origin_sz + max_added_rows, 0);
+        } else {
+            dst_columns[probe_side_columns + 
i]->insert_range_from(*src_column.column.get(), 0,
+                                                                   
max_added_rows);
+        }
+    }
+    block.set_columns(std::move(dst_columns));
+}
+
+void process_build_block(int64_t build_block_pos, vectorized::Block& block,
+                         const vectorized::Block& build_block, size_t 
build_side_columns,
+                         const vectorized::Block& probe_block, size_t 
probe_side_columns) {
+    auto dst_columns = block.mutate_columns();
+    const size_t max_added_rows = probe_block.rows();
+    for (size_t i = 0; i < probe_side_columns; ++i) {
+        const vectorized::ColumnWithTypeAndName& src_column = 
probe_block.get_by_position(i);
+        if (!src_column.column->is_nullable() && 
dst_columns[i]->is_nullable()) {
+            auto origin_sz = dst_columns[i]->size();
+            assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
+                    ->get_nested_column_ptr()
+                    ->insert_range_from(*src_column.column.get(), 0, 
max_added_rows);
+            assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .resize_fill(origin_sz + max_added_rows, 0);
+        } else {
+            dst_columns[i]->insert_range_from(*src_column.column.get(), 0, 
max_added_rows);
+        }
+    }
+    for (size_t i = 0; i < build_side_columns; ++i) {
+        const vectorized::ColumnWithTypeAndName& src_column = 
build_block.get_by_position(i);
+        if (!src_column.column->is_nullable() &&
+            dst_columns[probe_side_columns + i]->is_nullable()) {
+            auto origin_sz = dst_columns[probe_side_columns + i]->size();
+            
assert_cast<vectorized::ColumnNullable*>(dst_columns[probe_side_columns + 
i].get())
+                    ->get_nested_column_ptr()
+                    ->insert_many_from(*src_column.column, build_block_pos, 
max_added_rows);
+            
assert_cast<vectorized::ColumnNullable*>(dst_columns[probe_side_columns + 
i].get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .resize_fill(origin_sz + max_added_rows, 0);
+        } else {
+            dst_columns[probe_side_columns + 
i]->insert_many_from(*src_column.column,
+                                                                  
build_block_pos, max_added_rows);
+        }
+    }
+    block.set_columns(std::move(dst_columns));
+}
+
+template <bool set_build_side_flag, bool set_probe_side_flag>
+void NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState* 
state,
+                                                               
vectorized::Block* probe_block) {
     auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
-    constexpr bool ignore_null = JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-    _left_block_start_pos = _left_block_pos;
-    _left_side_process_count = 0;
-    DCHECK(!_need_more_input_data || !_matched_rows_done);
+    while (_join_block.rows() < state->batch_size()) {
+        while (_current_build_pos == _shared_state->build_blocks.size() ||
+               _probe_block_pos == probe_block->rows()) {
+            // if probe block is empty(), do not need disprocess the probe 
block rows
+            if (probe_block->rows() > _probe_block_pos) {
+                _probe_side_process_count++;
+            }
 
-    if (!_matched_rows_done && !_need_more_input_data) {
-        // We should try to join rows if there still are some rows from probe 
side.
-        // _probe_offset_stack and _build_offset_stack use u16 for storage
-        // because on the FE side, it is guaranteed that the batch size will 
not exceed 65535 (the maximum value for u16).s
-        while (_join_block.rows() < state->batch_size()) {
-            while (_current_build_pos == _shared_state->build_blocks.size() ||
-                   _left_block_pos == _child_block->rows()) {
-                // if left block is empty(), do not need disprocess the left 
block rows
-                if (_child_block->rows() > _left_block_pos) {
-                    _left_side_process_count++;
+            _reset_with_next_probe_row();
+            if (_probe_block_pos < probe_block->rows()) {
+                if constexpr (set_probe_side_flag) {
+                    _probe_offset_stack.push(cast_set<uint16_t, size_t, 
false>(_join_block.rows()));
                 }
-
-                _reset_with_next_probe_row();
-                if (_left_block_pos < _child_block->rows()) {
-                    if constexpr (set_probe_side_flag) {
-                        _probe_offset_stack.push(
-                                cast_set<uint16_t, size_t, 
false>(_join_block.rows()));
-                    }
+            } else {
+                if (_shared_state->probe_side_eos) {
+                    _matched_rows_done = true;
                 } else {
-                    if (_shared_state->left_side_eos) {
-                        _matched_rows_done = true;
-                    } else {
-                        _need_more_input_data = true;
-                    }
-                    break;
+                    _need_more_input_data = true;
                 }
+                break;
             }
+        }
+
+        // Do not have probe row need to be disposed
+        if (_matched_rows_done || _need_more_input_data) {
+            break;
+        }
+
+        const auto& now_process_build_block = 
_shared_state->build_blocks[_current_build_pos++];
+        if constexpr (set_build_side_flag) {
+            _build_offset_stack.push(cast_set<uint16_t, size_t, 
false>(_join_block.rows()));
+        }
+
+        SCOPED_TIMER(_output_temp_blocks_timer);
+        process_probe_block(_probe_block_pos, _join_block, *probe_block, 
p._num_probe_side_columns,
+                            now_process_build_block, 
p._num_build_side_columns);
+    }
+}
+
+// When the build side is small, generate data based on the build side.
+// Currently a simple heuristic is used: check whether build_blocks.size() == 1
+bool NestedLoopJoinProbeLocalState::use_generate_block_base_build() const {
+    return _shared_state->build_blocks.size() == 1;
+}
 
-            // Do not have left row need to be disposed
-            if (_matched_rows_done || _need_more_input_data) {
+// for inner join only
+// Generating data based on the build side follows the same logic
+// as generating data based on the probe side.
+// Only inner join calls this function, so both set_build_side_flag
+// and set_probe_side_flag are false.
+void NestedLoopJoinProbeLocalState::_generate_block_base_build(RuntimeState* 
state,
+                                                               
vectorized::Block* probe_block) {
+    DCHECK(use_generate_block_base_build());
+    auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+    const auto& build_block = _shared_state->build_blocks[0];
+    const size_t build_rows = build_block.rows();
+    const auto probe_rows = static_cast<int>(probe_block->rows());
+
+    // If the probe block is empty, return directly
+    /// TODO: Reconsider this logic; it may need to be handled outside
+    if (probe_rows == 0) {
+        if (_shared_state->probe_side_eos) {
+            _matched_rows_done = true;
+        } else {
+            _need_more_input_data = true;
+        }
+        return;
+    }
+
+    while (_join_block.rows() < state->batch_size()) {

Review Comment:
   `_join_block.rows() + probe_rows < state->batch_size() ` make sure the block 
size < state->batch_size(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to