This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 59acf61ec5 [pipelineX](pick) pick 2 PR from pipeline engine (#23463)
59acf61ec5 is described below
commit 59acf61ec5d487ccb98498174fe922318ba6ab4b
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 25 13:26:05 2023 +0800
[pipelineX](pick) pick 2 PR from pipeline engine (#23463)
---
be/src/pipeline/pipeline_x/dependency.cpp | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 10 +++++-----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 49262ce0e5..fab4db32f9 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -181,7 +181,7 @@ vectorized::BlockRowPos
AnalyticDependency::compare_row_to_find_end(int idx,
}
//binary search, set start and end pos
int64_t start_pos = start_init_row_num;
- int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows() - 1;
+ int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows();
//if end_block_num haven't moved, only start_block_num go to the end block
//so could use the end.row_num for binary search
if (start.block_num == end.block_num) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index d6d24d4e09..676414fce5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -142,23 +142,23 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
- if (!_source->can_read(_state)) {
+ if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
- if (!_sink->can_write(_state)) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
}
- this->set_begin_execute_time();
+ set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
- if (_data_state != SourceState::MORE_DATA &&
!_source->can_read(_state)) {
+ if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
- if (!_sink->can_write(_state)) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]