github-actions[bot] commented on code in PR #60986:
URL: https://github.com/apache/doris/pull/60986#discussion_r2881510775


##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -86,6 +86,28 @@ class BlockSerializer {
 
     MutableBlock* get_block() const { return _mutable_block.get(); }
 
+    // Ensure _mutable_block is initialized (using block's schema).
+    void ensure_mutable_block(Block* block) {
+        if (_mutable_block == nullptr) {
+            _mutable_block = MutableBlock::create_unique(block->clone_empty());
+        }
+    }
+
+    // Check if _mutable_block needs to be flushed (serialized and sent).
+    bool need_flush(bool eos) const {
+        return _mutable_block->rows() >= _batch_size || eos ||
+               (_mutable_block->rows() > 0 &&

Review Comment:
   **Bug: null-pointer dereference in low-memory mode.**
   
   `need_flush` unconditionally dereferences `_mutable_block` without a null 
check. In low-memory mode, `_serialize_block` calls `reset_block()` which sets 
`_mutable_block` to null. Then in `ExchangeTrivialWriter::_channel_add_rows`, 
after the non-eos flush loop (line 213) triggers a send that resets the block, 
the subsequent eos flush loop (line 220) calls `try_flush_after_scatter(true)` 
→ `need_flush(true)` → crash.
   
   Scenario:
   1. Scatter inserts enough rows to exceed `_batch_size`
   2. Non-eos `try_flush_after_scatter(false)` flushes → `_serialize_block` → 
`reset_block()` → `_mutable_block = nullptr`
   3. Eos `try_flush_after_scatter(true)` → `need_flush(true)` dereferences null
   
   Suggested fix:
   ```cpp
   bool need_flush(bool eos) const {
       if (!_mutable_block) return eos;
       return _mutable_block->rows() >= _batch_size || eos ||
              (_mutable_block->rows() > 0 &&
               _mutable_block->allocated_bytes() > _buffer_mem_limit);
   }
   ```
   
   Note: `mem_usage()` on the very next line already guards against null 
`_mutable_block`.



##########
be/src/pipeline/shuffle/exchange_writer.cpp:
##########
@@ -184,23 +184,46 @@ Status ExchangeTrivialWriter::_channel_add_rows(
         RuntimeState* state, 
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
         size_t channel_count, const std::vector<HashValType>& channel_ids, 
size_t rows,
         vectorized::Block* block, bool eos) {
-    _origin_row_idx.resize(rows);
-    _channel_rows_histogram.assign(channel_count, 0U);
-    _channel_pos_offsets.resize(channel_count);
-    for (size_t i = 0; i < rows; ++i) {
-        _channel_rows_histogram[channel_ids[i]]++;
-    }
-    _channel_pos_offsets[0] = 0;
-    for (size_t i = 1; i < channel_count; ++i) {
-        _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + 
_channel_rows_histogram[i - 1];
-    }
-    for (uint32_t i = 0; i < rows; i++) {
-        auto cid = channel_ids[i];
-        auto pos = _channel_pos_offsets[cid]++;
-        _origin_row_idx[pos] = i;
-    }
+    RETURN_IF_CATCH_EXCEPTION({
+        // Ensure each channel's mutable block is initialized.
+        // Even EOF channels need a valid mutable block as a dummy destination,
+        // since insert_to_multi_column scatters unconditionally.
+        for (size_t i = 0; i < channel_count; ++i) {
+            channels[i]->serializer().ensure_mutable_block(block);
+        }
 
-    return _add_rows_impl(state, channels, channel_count, block, eos);
+        // Collect destination column pointers for each source column.
+        const auto& block_data = block->get_columns_with_type_and_name();
+        auto num_columns = block_data.size();
+        std::vector<vectorized::IColumn*> dst_columns(channel_count);
+
+        for (size_t col_idx = 0; col_idx < num_columns; ++col_idx) {
+            for (size_t ch = 0; ch < channel_count; ++ch) {
+                auto* mutable_block = channels[ch]->serializer().get_block();
+                dst_columns[ch] = 
mutable_block->mutable_columns()[col_idx].get();
+            }
+            block_data[col_idx].column->insert_to_multi_column(dst_columns, 
channel_ids.data(),
+                                                               rows);
+        }
+
+        // Check each channel for flush/send.
+        Status status = Status::OK();
+        for (size_t i = 0; i < channel_count; ++i) {
+            if (!channels[i]->is_receiver_eof()) {
+                status = channels[i]->try_flush_after_scatter(false);
+                HANDLE_CHANNEL_STATUS(state, channels[i], status);
+            }
+        }
+        if (eos) {
+            for (size_t i = 0; i < channel_count; ++i) {
+                if (!channels[i]->is_receiver_eof()) {
+                    status = channels[i]->try_flush_after_scatter(true);
+                    HANDLE_CHANNEL_STATUS(state, channels[i], status);

Review Comment:
   **Issue: EOS flush after low-memory reset will crash.**
   
   As described in the `need_flush` comment, after the non-eos loop at line 213 
may reset `_mutable_block` in low-memory mode, this eos loop dereferences it 
via `try_flush_after_scatter(true)` → `need_flush(true)`. Also, even without 
the crash, the eos loop should still send an EOS signal even if the mutable 
block was already flushed — the current logic would skip the send if 
`need_flush` returns false (which it won't because `eos=true` short-circuits, 
but after the null fix, it would still enter the send path correctly with the 
suggested fix above).



##########
be/test/pipeline/shuffle/exchange_writer_test.cpp:
##########
@@ -60,91 +60,6 @@ static std::vector<std::shared_ptr<Channel>> 
make_disabled_channels(
     return channels;
 }
 
-TEST(TrivialExchangeWriterTest, BasicDistribution) {
-    MockRuntimeState state;

Review Comment:
   **Tests deleted without replacement.** Three `ExchangeTrivialWriter` tests 
were removed (`BasicDistribution`, `AllRowsToSingleChannel`, `EmptyInput`) and 
no new tests cover the replacement `insert_to_multi_column` scatter path. Per 
project coding standards, all kernel features must have corresponding tests.
   
   Needed:
   1. Unit tests for `ColumnVector::insert_to_multi_column`, 
`ColumnStr::insert_to_multi_column`, `ColumnNullable::insert_to_multi_column` — 
verify correct row scatter with various destination counts and row 
distributions.
   2. At least one test for the `ExchangeTrivialWriter::_channel_add_rows` new 
path verifying rows end up in the correct channel mutable blocks.
   3. Edge case: empty input (0 rows), single channel, all rows to one channel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to