This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f55f0b353a5 [fix](pipeline) no alloc_resource projections in
VSetOperationNode #26673 (#32945)
f55f0b353a5 is described below
commit f55f0b353a54ebe2192d733b6a5284c748e88010
Author: Mryange <[email protected]>
AuthorDate: Tue Apr 2 11:34:24 2024 +0800
[fix](pipeline) no alloc_resource projections in VSetOperationNode #26673
(#32945)
---
be/src/exec/exec_node.h | 11 +++++++
be/src/pipeline/exec/union_source_operator.cpp | 4 ++-
be/src/vec/exec/vset_operation_node.cpp | 2 ++
be/src/vec/exec/vunion_node.cpp | 2 +-
be/src/vec/exec/vunion_node.h | 2 ++
.../data/correctness_p0/test_vexcept_not_open.out | 4 +++
.../correctness_p0/test_vexcept_not_open.groovy | 35 ++++++++++++++++++++++
7 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index e688fb7f889..f9e457f42a2 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -79,6 +79,17 @@ public:
// If overridden in subclass, must first call superclass's prepare().
[[nodiscard]] virtual Status prepare(RuntimeState* state);
+ /*
+ * For open and alloc_resource:
+ * Base class ExecNode's `open` only calls `alloc_resource`, which opens
some public projections.
+ * If was overrided, `open` must call corresponding `alloc_resource`
since it's a (early) part of opening.
+ * Or just call `ExecNode::open` is alternative way.
+ * Then `alloc_resource` call father's after it's own business to make
the progress completed, including the projections.
+ * In Pipeline engine:
+ * PipeContext::prepare -> node::prepare
+ * Task::open -> StreamingOp::open -> node::alloc_resource, for
sink+source splits, only open in SinkOperator.
+ * So in pipeline, the things directly done by open(like call child's)
wouldn't be done in `open`.
+ */
// Performs any preparatory work prior to calling get_next().
// Can be called repeatedly (after calls to close()).
// Caller must not be holding any io buffers. This will cause deadlock.
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 1b4f4125619..446aeebd2ff 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -52,8 +52,10 @@ bool UnionSourceOperator::_has_data() {
// we assumed it can read to process const expr, Although we don't know
whether there is
// ,and queue have data, could read also
+// The source operator's run dependences on Node's alloc_resource, which is
called in Sink's open.
+// So hang until SinkOperator was scheduled to open.
bool UnionSourceOperator::can_read() {
- return _has_data() || _data_queue->is_all_finish();
+ return _node->resource_allocated() && (_has_data() ||
_data_queue->is_all_finish());
}
Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block*
block, bool* eos) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 6672352deea..40343b96f87 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -216,6 +216,8 @@ Status VSetOperationNode<is_intersect>::init(const
TPlanNode& tnode, RuntimeStat
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
+ // will open projections
+ RETURN_IF_ERROR(ExecNode::alloc_resource(state));
// open result expr lists.
for (const VExprContextSPtrs& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 90cbe5bb7ae..4ca2db0ef94 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -95,7 +95,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
}
Status VUnionNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(alloc_resource(state));
+ RETURN_IF_ERROR(ExecNode::open(state)); // exactly same with
this->alloc_resource()
// Ensures that rows are available for clients to fetch after this open()
has
// succeeded.
if (!_children.empty()) {
diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h
index ac63f9ca632..71078f50543 100644
--- a/be/src/vec/exec/vunion_node.h
+++ b/be/src/vec/exec/vunion_node.h
@@ -64,6 +64,8 @@ public:
/// GetNext() for the constant expression case.
Status get_next_const(RuntimeState* state, Block* block);
+ bool resource_allocated() const { return _resource_allocated; }
+
private:
/// Const exprs materialized by this node. These exprs don't refer to any
children.
/// Only materialized by the first fragment instance to avoid duplication.
diff --git a/regression-test/data/correctness_p0/test_vexcept_not_open.out
b/regression-test/data/correctness_p0/test_vexcept_not_open.out
new file mode 100644
index 00000000000..e31d66067e5
--- /dev/null
+++ b/regression-test/data/correctness_p0/test_vexcept_not_open.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+false
+
diff --git a/regression-test/suites/correctness_p0/test_vexcept_not_open.groovy
b/regression-test/suites/correctness_p0/test_vexcept_not_open.groovy
new file mode 100644
index 00000000000..07e0a2d50a3
--- /dev/null
+++ b/regression-test/suites/correctness_p0/test_vexcept_not_open.groovy
@@ -0,0 +1,35 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_vexcept_not_open") {
+ sql """
+ drop table if EXISTS tbl_vexcept;
+ """
+ sql """
+ CREATE table tbl_vexcept (c1 varchar(31), c2 varchar(10)) ENGINE=OLAP
DUPLICATE KEY( c1 ) COMMENT "OLAP" DISTRIBUTED BY HASH( c1 ) BUCKETS auto
PROPERTIES ( "replication_num" = "1" );
+ """
+
+ sql """
+ insert into tbl_vexcept select 'ab', 'abc';
+ """
+
+ qt_select """
+ SELECT c2 LIKE 'ab' FROM ( SELECT c2 FROM tbl_vexcept EXCEPT SELECT 'ab'
c2 ) AS t1;
+ """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]