Copilot commented on code in PR #60986:
URL: https://github.com/apache/doris/pull/60986#discussion_r2877299872


##########
be/src/vec/columns/column.h:
##########
@@ -227,6 +227,12 @@ class IColumn : public COW<IColumn> {
     virtual void insert_from_multi_column(const std::vector<const IColumn*>& 
srcs,
                                           const std::vector<size_t>& 
positions) = 0;
 
+    // Scatter the rows of this column into multiple destination columns.
+    // positions.size() == this->size(), positions[i] is the index into dsts
+    // that row i should be inserted into.

Review Comment:
   The new API comment mentions `positions.size() == this->size()`, but the 
signature now takes a raw `positions` pointer plus an explicit `rows` count. 
Please update the comment to reflect the actual contract (e.g., `positions` has 
length `rows`, and `rows` may be <= `size()`).
   ```suggestion
       // `positions` points to an array of length `rows`, and `rows` is the 
number of
       // source rows to scatter (rows <= this->size()). positions[i] is the 
index into
       // `dsts` that row i should be inserted into.
   ```



##########
be/src/pipeline/shuffle/exchange_writer.cpp:
##########
@@ -184,23 +184,46 @@ Status ExchangeTrivialWriter::_channel_add_rows(
         RuntimeState* state, 
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
         size_t channel_count, const std::vector<HashValType>& channel_ids, 
size_t rows,
         vectorized::Block* block, bool eos) {
-    _origin_row_idx.resize(rows);
-    _channel_rows_histogram.assign(channel_count, 0U);
-    _channel_pos_offsets.resize(channel_count);
-    for (size_t i = 0; i < rows; ++i) {
-        _channel_rows_histogram[channel_ids[i]]++;
-    }
-    _channel_pos_offsets[0] = 0;
-    for (size_t i = 1; i < channel_count; ++i) {
-        _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + 
_channel_rows_histogram[i - 1];
-    }
-    for (uint32_t i = 0; i < rows; i++) {
-        auto cid = channel_ids[i];
-        auto pos = _channel_pos_offsets[cid]++;
-        _origin_row_idx[pos] = i;
-    }
+    RETURN_IF_CATCH_EXCEPTION({
+        // Ensure each channel's mutable block is initialized.
+        // Even EOF channels need a valid mutable block as a dummy destination,
+        // since insert_to_multi_column scatters unconditionally.
+        for (size_t i = 0; i < channel_count; ++i) {
+            channels[i]->serializer().ensure_mutable_block(block);
+        }

Review Comment:
   `insert_to_multi_column` currently forces `ensure_mutable_block()` for *all* 
channels (including `is_receiver_eof()` ones). For EOF channels this 
re-allocates a mutable block after `close()` has reset it, and since EOF 
channels are skipped in the flush loops, rows routed to them will accumulate in 
memory and never be released until query end. Consider routing EOF channels to 
a per-call dummy MutableBlock/columns (or teaching scatter to ignore nullptr 
dsts), and only initializing real channel blocks for non-EOF channels that 
actually receive rows in this batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to