github-actions[bot] commented on code in PR #61758:
URL: https://github.com/apache/doris/pull/61758#discussion_r2992972386


##########
be/src/exec/operator/materialization_opertor.cpp:
##########
@@ -53,29 +53,62 @@ void MaterializationSharedState::get_block(Block* block) {
     origin_block.clear();
 }
 
+// Merges RPC responses from multiple BEs into `response_blocks` in the 
original row order.
+//
+// After parallel multiget_data_v2 RPCs complete, each BE's response contains 
a partial block
+// with only the rows that BE owns (ordered by file_id/row_id). This function 
reassembles them
+// into the correct TopN output order using `block_order_results` as the 
ordering guide.
+//
+// Data flow:
+//   rpc_struct_map[backend_id].response  (per-BE partial blocks, unordered 
across BEs)
+//       + block_order_results[i][j]      (maps each output row → its source 
backend_id)
+//       → response_blocks[i]             (final merged result in original 
TopN row order)
 Status MaterializationSharedState::merge_multi_response() {
+    // Outer loop: iterate over each relation (i.e., each rowid column / 
table).
+    // A query with lazy materialization on 2 tables would have 
block_order_results.size() == 2,
+    // each with its own set of response_blocks and RPC request_block_descs.
     for (int i = 0; i < block_order_results.size(); ++i) {
+        // Maps backend_id → (deserialized block from that BE, row cursor into 
the block).
+        // The cursor tracks how many rows we've consumed from this BE's block 
so far,
+        // since the rows in the partial block are in the same order as the 
row_ids we sent.
+
         // block_maps must be rebuilt for each relation (each i), because a 
backend that
         // returned a non-empty block for relation i-1 may return an empty 
block for
         // relation i (e.g. it holds rows only from one of the two tables in a 
UNION ALL).
         // Keeping block_maps across iterations would leave stale entries from 
the previous
         // relation and miss entries for the current one, causing the
         // "backend_id not found in block_maps" error.
         std::unordered_map<int64_t, std::pair<Block, int>> block_maps;
+
+        // Phase 1: Deserialize the i-th response block from every BE into 
block_maps.
+        // Each BE's response.blocks(i) corresponds to the i-th relation's 
fetched columns.
         for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
             Block partial_block;
             size_t uncompressed_size = 0;
             int64_t uncompressed_time = 0;
             DCHECK(rpc_struct.response.blocks_size() > i);
             
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
                                                       &uncompressed_size, 
&uncompressed_time));
+            if (rpc_struct.request.request_block_descs(i).row_id_size() != 
partial_block.rows()) {
+                return Status::InternalError(
+                        fmt::format("merge_multi_response, "
+                                    "backend_id {} returned block with row 
count {} not match "
+                                    "request row id count {}",
+                                    backend_id, partial_block.rows(),
+                                    
rpc_struct.request.request_block_descs(i).row_id_size()));
+            }
             if (rpc_struct.response.blocks(i).has_profile()) {
                 auto response_profile =
                         
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
                 _update_profile_info(backend_id, response_profile.get());
             }
 
+            // Only insert non-empty blocks. A BE may return an empty block if
+            // request.request_block_descs(i).row_id_size() is 0
+            // OR if the id_file_map was GC'd on the BE before it could 
process the request,
+            // refer 'if (!id_file_map)' in RowIdStorageReader::read_by_rowids

Review Comment:
   [Minor] This comment is now stale after the row count validation added at 
line 92. The second condition — "OR if the id_file_map was GC'd on the BE" — 
can no longer reach this code path when `row_id_size() > 0`, because the new 
check `row_id_size() != partial_block.rows()` would have already returned an 
`InternalError` before this point.
   
   Suggested fix: remove the "OR if the id_file_map was GC'd" clause from the 
comment, since that case is now caught by the earlier validation.



##########
be/test/exec/operator/materialization_shared_state_test.cpp:
##########
@@ -318,4 +341,201 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
     EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr);
 }
 
+// Regression test: when a remote BE returns an empty response block for a 
relation
+// (e.g., id_file_map was GC'd or the BE restarted), but block_order_results 
still
+// references that backend_id, merge_multi_response() should return a clear
+// InternalError("backend_id {} not found in block_maps") rather than crashing.

Review Comment:
   [Minor] This comment says the test expects `InternalError("backend_id {} not 
found in block_maps")`, but the actual assertion on line 418 checks for `"not 
match request row id count"`. The comment describes the old (pre-this-PR) error 
path, not the current behavior.
   
   Suggested fix: update the comment to reflect the actual expected error: 
`InternalError("... not match request row id count ...")`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to