github-actions[bot] commented on code in PR #61758:
URL: https://github.com/apache/doris/pull/61758#discussion_r2992972386
##########
be/src/exec/operator/materialization_opertor.cpp:
##########
@@ -53,29 +53,62 @@ void MaterializationSharedState::get_block(Block* block) {
origin_block.clear();
}
+// Merges RPC responses from multiple BEs into `response_blocks` in the
original row order.
+//
+// After parallel multiget_data_v2 RPCs complete, each BE's response contains
a partial block
+// with only the rows that BE owns (ordered by file_id/row_id). This function
reassembles them
+// into the correct TopN output order using `block_order_results` as the
ordering guide.
+//
+// Data flow:
+// rpc_struct_map[backend_id].response (per-BE partial blocks, unordered
across BEs)
+// + block_order_results[i][j] (maps each output row → its source
backend_id)
+// → response_blocks[i] (final merged result in original
TopN row order)
Status MaterializationSharedState::merge_multi_response() {
+ // Outer loop: iterate over each relation (i.e., each rowid column /
table).
+ // A query with lazy materialization on 2 tables would have
block_order_results.size() == 2,
+ // each with its own set of response_blocks and RPC request_block_descs.
for (int i = 0; i < block_order_results.size(); ++i) {
+ // Maps backend_id → (deserialized block from that BE, row cursor into
the block).
+ // The cursor tracks how many rows we've consumed from this BE's block
so far,
+ // since the rows in the partial block are in the same order as the
row_ids we sent.
+
// block_maps must be rebuilt for each relation (each i), because a
backend that
// returned a non-empty block for relation i-1 may return an empty
block for
// relation i (e.g. it holds rows only from one of the two tables in a
UNION ALL).
// Keeping block_maps across iterations would leave stale entries from
the previous
// relation and miss entries for the current one, causing the
// "backend_id not found in block_maps" error.
std::unordered_map<int64_t, std::pair<Block, int>> block_maps;
+
+ // Phase 1: Deserialize the i-th response block from every BE into
block_maps.
+ // Each BE's response.blocks(i) corresponds to the i-th relation's
fetched columns.
for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
Block partial_block;
size_t uncompressed_size = 0;
int64_t uncompressed_time = 0;
DCHECK(rpc_struct.response.blocks_size() > i);
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
&uncompressed_size,
&uncompressed_time));
+ if (rpc_struct.request.request_block_descs(i).row_id_size() !=
partial_block.rows()) {
+ return Status::InternalError(
+ fmt::format("merge_multi_response, "
+ "backend_id {} returned block with row
count {} not match "
+ "request row id count {}",
+ backend_id, partial_block.rows(),
+
rpc_struct.request.request_block_descs(i).row_id_size()));
+ }
if (rpc_struct.response.blocks(i).has_profile()) {
auto response_profile =
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
_update_profile_info(backend_id, response_profile.get());
}
+ // Only insert non-empty blocks. A BE may return an empty block if
+ // request.request_block_descs(i).row_id_size() is 0
+ // OR if the id_file_map was GC'd on the BE before it could
process the request,
+ // refer 'if (!id_file_map)' in RowIdStorageReader::read_by_rowids
Review Comment:
[Minor] This comment is now stale after the row count validation added at
line 92. The second condition — "OR if the id_file_map was GC'd on the BE" —
can no longer reach this code path when `row_id_size() > 0`, because the new
check `row_id_size() != partial_block.rows()` would have already returned an
`InternalError` before this point.
Suggested fix: remove the "OR if the id_file_map was GC'd" clause from the
comment, since that case is now caught by the earlier validation.
##########
be/test/exec/operator/materialization_shared_state_test.cpp:
##########
@@ -318,4 +341,201 @@ TEST_F(MaterializationSharedStateTest,
TestMergeMultiResponseMultiBlocks) {
EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr);
}
+// Regression test: when a remote BE returns an empty response block for a
relation
+// (e.g., id_file_map was GC'd or the BE restarted), but block_order_results
still
+// references that backend_id, merge_multi_response() should return a clear
+// InternalError("backend_id {} not found in block_maps") rather than crashing.
Review Comment:
[Minor] This comment says the test expects `InternalError("backend_id {} not
found in block_maps")`, but the actual assertion on line 418 checks for `"not
match request row id count"`. The comment describes the old (pre-this-PR) error
path, not the current behavior.
Suggested fix: update the comment to reflect the actual expected error:
`InternalError("... not match request row id count ...")`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]