Copilot commented on code in PR #61739:
URL: https://github.com/apache/doris/pull/61739#discussion_r2992162719
##########
be/src/exec/operator/result_sink_operator.cpp:
##########
@@ -199,9 +199,13 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows));
}
+ // In parallel result sink mode, the buffer is registered under query_id;
otherwise
+ // it is registered under fragment_instance_id. Pass the matching key so
the
+ // deferred cancel actually finds and removes the buffer entry.
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id());
+ state->query_options().enable_parallel_result_sink ?
state->query_id()
+ :
state->fragment_instance_id());
Review Comment:
In parallel result sink mode, multiple fragment instances share the same
ResultBlockBuffer keyed by query_id, and `close()` is invoked per fragment
instance (removing one dependency each time). Scheduling `cancel_at_time(...,
query_id)` on every instance close can cancel/erase the shared buffer after the
interval even if other instances are still producing results (i.e., before the
buffer is actually fully closed), which can break result fetching and/or cancel
in-flight execution. Consider scheduling the deferred cancel only when the
shared buffer transitions to fully closed (after the last dependency is
removed), e.g., by moving this scheduling into `ResultBlockBuffer::close` when
`_result_sink_dependencies` becomes empty, or by extending the
`ResultBlockBufferBase::close` contract to indicate “final close” so callers
can safely schedule cancel once.
##########
be/src/exec/operator/result_file_sink_operator.cpp:
##########
@@ -135,9 +135,13 @@ Status ResultFileSinkLocalState::close(RuntimeState*
state, Status exec_status)
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows));
}
+ // In parallel outfile mode, the buffer is registered under query_id;
otherwise
+ // it is registered under fragment_instance_id. Pass the matching key so
the
+ // deferred cancel actually finds and removes the buffer entry.
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id());
+ state->query_options().enable_parallel_outfile ? state->query_id()
+ :
state->fragment_instance_id());
Review Comment:
Same concern as result sink: in parallel outfile mode the buffer is shared
under `query_id` and `ResultBlockBuffer::close` only fully closes once all
fragment-instance dependencies have been removed. Calling `cancel_at_time(...,
query_id)` from every fragment instance close can cancel the shared buffer
after the interval even if some instances are still running, potentially
breaking client fetch and/or canceling remaining producers. Consider deferring
scheduling until the buffer is fully closed (e.g., schedule inside
`ResultBlockBuffer::close` when the last dependency is removed, or change
`close()` to return/indicate when it performed the final close so this can be
scheduled once).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]