github-actions[bot] commented on code in PR #64596:
URL: https://github.com/apache/doris/pull/64596#discussion_r3439787970
##########
regression-test/suites/query_p0/join/test_join2.groovy:
##########
@@ -108,6 +108,15 @@ suite("test_join2", "query,p0,arrow_flight_sql") {
ORDER BY 1,2,3,4,5,6;
"""
+ sql "set batch_size = 1"
+ sql """
Review Comment:
This regression currently executes the LIMIT query and discards the result,
so it would still pass if incremental NLJ returned too few rows or otherwise
produced the wrong output. For a deterministic check without adding `ORDER BY`
above the LIMIT, wrap the limited join and assert `count(*) = 5` with a `qt_`
case (and the generated `.out`). That keeps the LIMIT in the plan while making
the test fail on incorrect row production.
##########
be/src/exec/operator/nested_loop_join_build_operator.cpp:
##########
@@ -52,22 +57,35 @@ Status
NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
-Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status
exec_status) {
- if (!state->is_cancelled()) {
- RETURN_IF_ERROR(
- _runtime_filter_producer_helper->process(state,
_shared_state->build_blocks));
+Status NestedLoopJoinBuildSinkLocalState::terminate(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ if (_terminated) {
+ return Status::OK();
+ }
+ if (_runtime_filter_producer_helper && !_shared_state->build_side_eos) {
+ RETURN_IF_ERROR(_runtime_filter_producer_helper->skip_process(state));
}
+ return JoinBuildSinkLocalState::terminate(state);
+}
+
+Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status
exec_status) {
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
return Status::OK();
}
+bool NestedLoopJoinBuildSinkLocalState::is_finished() const {
Review Comment:
This still waits for one more build-side `sink_impl()` call before the build
pipeline can be stopped. The probe side sets `build_side_no_more_required` as
soon as LIMIT/probe completion is reached, and the cancel worker calls
`PipelineTask::stop_if_finished()` specifically to wake tasks whose sink is
done. But `is_finished()` returns `_build_side_finished`, which is only set
later at the top of `sink_impl()`. If the build task is currently blocked in
its source after publishing a prefix, the cancel worker will keep seeing
`false`; the fragment then waits until the build source produces another
block/EOS before it can terminate the build side. That defeats the stop-early
behavior for the slow/blocked build-side case this PR is targeting. Since
`_shared_state_holder` now keeps the shared state alive for this callback,
`is_finished()` should observe the shared stop flag directly for partial-build
joins, so `stop_if_finished()` can wake and terminate the build task without
waiting for
another input block.
```suggestion
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
return p._enable_partial_build_output &&
_shared_state->should_stop_build();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]