This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a74ad1702 [feature](Nereids)add the ability of projection on each
ExecNode and add column prune on OlapScan (#11842)
9a74ad1702 is described below
commit 9a74ad1702fdf850e758934d8237402fac60b0d6
Author: Kikyou1997 <[email protected]>
AuthorDate: Tue Aug 30 16:17:10 2022 +0800
[feature](Nereids)add the ability of projection on each ExecNode and add
column prune on OlapScan (#11842)
We have added logical project before, but to actually finish the prune to
reduce the data IO, we need to add related supports in translator and BE.
This PR:
- add projections on each ExecNode in BE
- translate PhysicalProject into projections on PlanNode in FE
- do column prune on ScanNode in FE
Co-authored-by: HappenLee <[email protected]>
---
be/src/exec/exec_node.cpp | 57 ++++++++++++++-
be/src/exec/exec_node.h | 14 +++-
be/src/runtime/plan_fragment_executor.cpp | 5 +-
be/src/vec/exec/join/vhash_join_node.cpp | 8 ++-
be/src/vec/exec/scan/new_olap_scanner.cpp | 5 +-
be/src/vec/exec/vaggregation_node.cpp | 4 +-
be/src/vec/exec/vanalytic_eval_node.cpp | 5 +-
be/src/vec/exec/vassert_num_rows_node.cpp | 4 +-
be/src/vec/exec/vblocking_join_node.cpp | 5 +-
be/src/vec/exec/vcross_join_node.cpp | 5 +-
be/src/vec/exec/vrepeat_node.cpp | 2 +-
be/src/vec/exec/vselect_node.cpp | 5 +-
be/src/vec/exec/vset_operation_node.cpp | 7 +-
be/src/vec/exec/vsort_node.cpp | 2 +-
be/src/vec/exec/vtable_function_node.cpp | 2 +-
be/src/vec/exec/vunion_node.cpp | 7 +-
be/src/vec/exprs/vexpr.cpp | 8 +--
be/src/vec/functions/like.cpp | 11 ++-
.../org/apache/doris/analysis/SlotDescriptor.java | 6 ++
.../glue/translator/ExpressionTranslator.java | 3 +-
.../glue/translator/PhysicalPlanTranslator.java | 78 +++++++++++++-------
.../glue/translator/PlanTranslatorContext.java | 15 ++--
.../org/apache/doris/nereids/jobs/JobType.java | 2 +-
.../doris/nereids/jobs/batch/RewriteJob.java | 2 +
.../doris/nereids/stats/StatsCalculator.java | 2 +-
.../nereids/trees/expressions/Expression.java | 23 ------
.../nereids/trees/expressions/SlotReference.java | 4 --
.../doris/nereids/trees/plans/AbstractPlan.java | 12 ++++
.../doris/nereids/trees/plans/GroupPlan.java | 1 +
.../org/apache/doris/nereids/trees/plans/Plan.java | 4 --
.../trees/plans/physical/AbstractPhysicalPlan.java | 7 --
.../nereids/trees/plans/visitor/PlanVisitor.java | 2 +-
.../org/apache/doris/planner/ExchangeNode.java | 13 +++-
.../org/apache/doris/planner/HashJoinNode.java | 29 +++++++-
.../org/apache/doris/planner/OlapScanNode.java | 1 -
.../java/org/apache/doris/planner/PlanNode.java | 31 +++++++-
.../apache/doris/planner/SingleNodePlanner.java | 2 +-
.../translator/PhysicalPlanTranslatorTest.java | 82 ++++++++++++++++++++++
.../apache/doris/nereids/util/PlanRewriter.java | 4 +-
.../doris/planner/DistributedPlannerTest.java | 2 +
40 files changed, 352 insertions(+), 129 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 0b643135b0..9a2b335728 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -150,9 +150,13 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode&
tnode, const DescriptorTbl
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
_get_next_span(),
- _is_closed(false) {}
+ _is_closed(false) {
+ if (tnode.__isset.output_tuple_id) {
+ _output_row_descriptor.reset(new RowDescriptor(descs,
{tnode.output_tuple_id}, {true}));
+ }
+}
-ExecNode::~ExecNode() {}
+ExecNode::~ExecNode() = default;
void ExecNode::push_down_predicate(RuntimeState* state,
std::list<ExprContext*>* expr_ctxs) {
if (_type != TPlanNodeType::AGGREGATION_NODE) {
@@ -194,6 +198,13 @@ Status ExecNode::init(const TPlanNode& tnode,
RuntimeState* state) {
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts,
&_conjunct_ctxs));
+ // create the projections expr
+ if (tnode.__isset.projections) {
+ DCHECK(tnode.__isset.output_tuple_id);
+ RETURN_IF_ERROR(
+ vectorized::VExpr::create_expr_trees(_pool, tnode.projections,
&_projections));
+ }
+
return Status::OK();
}
@@ -220,6 +231,7 @@ Status ExecNode::prepare(RuntimeState* state) {
typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor));
}
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
_row_descriptor));
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
@@ -239,6 +251,8 @@ Status ExecNode::open(RuntimeState* state) {
} else {
return Status::OK();
}
+ RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state));
+ return vectorized::VExpr::open(_projections, state);
}
Status ExecNode::reset(RuntimeState* state) {
@@ -282,6 +296,7 @@ Status ExecNode::close(RuntimeState* state) {
typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
Expr::close(_conjunct_ctxs, state);
}
+ vectorized::VExpr::close(_projections, state);
if (_buffer_pool_client.is_registered()) {
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
@@ -769,4 +784,42 @@ std::string ExecNode::get_name() {
return (_is_vec ? "V" : "") + print_plan_node_type(_type);
}
+Status ExecNode::do_projections(vectorized::Block* origin_block,
vectorized::Block* output_block) {
+ using namespace vectorized;
+ auto is_mem_reuse = output_block->mem_reuse();
+ MutableBlock mutable_block =
+ is_mem_reuse ? MutableBlock(output_block)
+ :
MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
+ *_output_row_descriptor));
+ auto rows = origin_block->rows();
+
+ if (rows != 0) {
+ auto& mutable_columns = mutable_block.mutable_columns();
+ DCHECK(mutable_columns.size() == _projections.size());
+ for (int i = 0; i < mutable_columns.size(); ++i) {
+ auto result_column_id = -1;
+ RETURN_IF_ERROR(_projections[i]->execute(origin_block,
&result_column_id));
+ auto column_ptr = origin_block->get_by_position(result_column_id)
+
.column->convert_to_full_column_if_const();
+ mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+ }
+
+ if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
+ DCHECK(output_block->rows() == rows);
+ }
+
+ return Status::OK();
+}
+
+Status ExecNode::get_next_after_projects(RuntimeState* state,
vectorized::Block* block, bool* eos) {
+ // delete the UNLIKELY after support new optimizers
+ if (UNLIKELY(_output_row_descriptor)) {
+
_origin_block.clear_column_data(_row_descriptor.num_materialized_slots());
+ auto status = get_next(state, &_origin_block, eos);
+ if (UNLIKELY(!status.ok())) return status;
+ return do_projections(&_origin_block, block);
+ }
+ return get_next(state, block, eos);
+}
+
} // namespace doris
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 5b700090b2..c9f4323dca 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -105,6 +105,9 @@ public:
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool*
eos);
virtual Status get_next(RuntimeState* state, vectorized::Block* block,
bool* eos);
+ // new interface to compatible new optimizers in FE
+ Status get_next_after_projects(RuntimeState* state, vectorized::Block*
block, bool* eos);
+
// Resets the stream of row batches to be retrieved by subsequent
GetNext() calls.
// Clears all internal state, returning this node to the state it was in
after calling
// Prepare() and before calling Open(). This function must not clear memory
@@ -179,7 +182,9 @@ public:
int id() const { return _id; }
TPlanNodeType::type type() const { return _type; }
- virtual const RowDescriptor& row_desc() const { return _row_descriptor; }
+ virtual const RowDescriptor& row_desc() const {
+ return _output_row_descriptor ? *_output_row_descriptor :
_row_descriptor;
+ }
int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >=
_limit; }
@@ -221,6 +226,9 @@ protected:
// and add block rows for profile
void reached_limit(vectorized::Block* block, bool* eos);
+ /// Only use in vectorized exec engine try to do projections to trans
_row_desc -> _output_row_desc
+ Status do_projections(vectorized::Block* origin_block, vectorized::Block*
output_block);
+
/// Extends blocking queue for row batches. Row batches have a property
that
/// they must be processed in the order they were produced, even in
cancellation
/// paths. Preceding row batches can contain ptrs to memory in subsequent
row batches
@@ -276,6 +284,10 @@ protected:
std::vector<ExecNode*> _children;
RowDescriptor _row_descriptor;
+ vectorized::Block _origin_block;
+
+ std::unique_ptr<RowDescriptor> _output_row_descriptor;
+ std::vector<doris::vectorized::VExprContext*> _projections;
/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 9346b947d3..6274b47174 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -339,8 +339,9 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
while (!_done) {
_block->clear_column_data(_plan->row_desc().num_materialized_slots());
SCOPED_TIMER(profile()->total_time_counter());
- RETURN_IF_ERROR_AND_CHECK_SPAN(_plan->get_next(_runtime_state.get(),
_block.get(), &_done),
- _plan->get_next_span(), _done);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ _plan->get_next_after_projects(_runtime_state.get(),
_block.get(), &_done),
+ _plan->get_next_span(), _done);
if (_block->rows() > 0) {
COUNTER_UPDATE(_rows_produced_counter, _block->rows());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 228e0020e8..05040fc2b6 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -892,6 +892,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state,
_intermediate_row_desc));
}
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state,
_intermediate_row_desc));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
_intermediate_row_desc));
// right table data types
_right_table_data_types =
VectorizedUtils::get_data_types(child(1)->row_desc());
@@ -937,8 +938,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
do {
SCOPED_TIMER(_probe_next_timer);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state,
&_probe_block, &_probe_eos),
- child(0)->get_next_span(),
_probe_eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(0)->get_next_after_projects(state, &_probe_block,
&_probe_eos),
+ child(0)->get_next_span(), _probe_eos);
} while (_probe_block.rows() == 0 && !_probe_eos);
probe_rows = _probe_block.rows();
@@ -1135,7 +1137,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState*
state) {
block.clear_column_data();
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block,
&eos),
child(1)->get_next_span(), eos);
_mem_used += block.allocated_bytes();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 00e9b2bc34..0b25890a50 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -175,9 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.delete_predicates.begin()));
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
- _tablet_schema->merge_dropped_columns(
- _tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
+
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
}
// Range
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index a724c46afe..ae6421e7a6 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -433,7 +433,7 @@ Status AggregationNode::open(RuntimeState* state) {
while (!eos) {
RETURN_IF_CANCELLED(state);
release_block_memory(block);
- RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block,
&eos),
+
RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next_after_projects(state,
&block, &eos),
_children[0]->get_next_span(), eos);
if (block.rows() == 0) {
continue;
@@ -461,7 +461,7 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
do {
release_block_memory(_preagg_block);
RETURN_IF_ERROR_AND_CHECK_SPAN(
- _children[0]->get_next(state, &_preagg_block, &child_eos),
+ _children[0]->get_next_after_projects(state,
&_preagg_block, &child_eos),
_children[0]->get_next_span(), child_eos);
} while (_preagg_block.rows() == 0 && !child_eos);
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index dde7cb8453..fc5b224253 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -467,8 +467,9 @@ Status
VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
Block block;
RETURN_IF_CANCELLED(state);
do {
- RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block,
&_input_eos),
- _children[0]->get_next_span(),
_input_eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ _children[0]->get_next_after_projects(state, &block,
&_input_eos),
+ _children[0]->get_next_span(), _input_eos);
} while (!_input_eos && block.rows() == 0);
if (_input_eos && block.rows() == 0) {
diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp
b/be/src/vec/exec/vassert_num_rows_node.cpp
index 28e1389bb7..b9239a3c53 100644
--- a/be/src/vec/exec/vassert_num_rows_node.cpp
+++ b/be/src/vec/exec/vassert_num_rows_node.cpp
@@ -51,8 +51,8 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state,
Block* block, bool* eos
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VAssertNumRowsNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, block, eos),
child(0)->get_next_span(),
- *eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state,
block, eos),
+ child(0)->get_next_span(), *eos);
_num_rows_returned += block->rows();
bool assert_res = false;
switch (_assertion) {
diff --git a/be/src/vec/exec/vblocking_join_node.cpp
b/be/src/vec/exec/vblocking_join_node.cpp
index c6d548de91..809da414f4 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -119,8 +119,9 @@ Status VBlockingJoinNode::open(RuntimeState* state) {
// Seed left child in preparation for get_next().
while (true) {
release_block_memory(_left_block);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_left_block,
&_left_side_eos),
- child(0)->get_next_span(),
_left_side_eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(0)->get_next_after_projects(state, &_left_block,
&_left_side_eos),
+ child(0)->get_next_span(), _left_side_eos);
COUNTER_UPDATE(_left_child_row_counter, _left_block.rows());
_left_block_pos = 0;
diff --git a/be/src/vec/exec/vcross_join_node.cpp
b/be/src/vec/exec/vcross_join_node.cpp
index 4a8c2f3b17..c3b94f7561 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -60,7 +60,7 @@ Status VCrossJoinNode::construct_build_side(RuntimeState*
state) {
RETURN_IF_CANCELLED(state);
Block block;
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos),
+
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block,
&eos),
child(1)->get_next_span(), eos);
auto rows = block.rows();
auto mem_usage = block.allocated_bytes();
@@ -117,7 +117,8 @@ Status VCrossJoinNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
release_block_memory(_left_block);
timer.stop();
RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next(state, &_left_block,
&_left_side_eos),
+ child(0)->get_next_after_projects(state,
&_left_block,
+
&_left_side_eos),
child(0)->get_next_span(), _left_side_eos);
timer.start();
} while (_left_block.rows() == 0 && !_left_side_eos);
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 3c180d417d..a2001cff15 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -167,7 +167,7 @@ Status VRepeatNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
if (_child_block->rows() == 0) {
while (_child_block->rows() == 0 && !_child_eos) {
RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next(state, _child_block.get(), &_child_eos),
+ child(0)->get_next_after_projects(state,
_child_block.get(), &_child_eos),
child(0)->get_next_span(), _child_eos);
}
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index c69f997150..8f561f10a0 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -48,8 +48,9 @@ Status VSelectNode::get_next(RuntimeState* state,
vectorized::Block* block, bool
RETURN_IF_CANCELLED(state);
do {
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, block,
&_child_eos),
- _children[0]->get_next_span(),
_child_eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ _children[0]->get_next_after_projects(state, block,
&_child_eos),
+ _children[0]->get_next_span(), _child_eos);
if (_child_eos) {
*eos = true;
break;
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index b46d9988c5..b95714513c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -242,7 +242,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState*
state) {
block.clear_column_data();
SCOPED_TIMER(_build_timer);
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block,
&eos),
child(0)->get_next_span(), eos);
size_t allocated_bytes = block.allocated_bytes();
@@ -309,8 +309,9 @@ Status VSetOperationNode::process_probe_block(RuntimeState*
state, int child_id,
_probe_rows = 0;
RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(child_id)->get_next(state,
&_probe_block, eos),
- child(child_id)->get_next_span(), *eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(child_id)->get_next_after_projects(state, &_probe_block,
eos),
+ child(child_id)->get_next_span(), *eos);
_probe_rows = _probe_block.rows();
RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns,
child_id));
return Status::OK();
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 73618b0b26..8749b2913e 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -125,7 +125,7 @@ Status VSortNode::sort_input(RuntimeState* state) {
bool eos = false;
do {
Block block;
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos),
+
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block,
&eos),
child(0)->get_next_span(), eos);
auto rows = block.rows();
diff --git a/be/src/vec/exec/vtable_function_node.cpp
b/be/src/vec/exec/vtable_function_node.cpp
index 0852d9d966..49ff5d5a69 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -117,7 +117,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState*
state, Block* output
if (_child_block->rows() == 0) {
while (_child_block->rows() == 0 && !_child_eos) {
RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next(state, _child_block.get(),
&_child_eos),
+ child(0)->get_next_after_projects(state,
_child_block.get(), &_child_eos),
child(0)->get_next_span(), _child_eos);
}
if (_child_eos && _child_block->rows() == 0) {
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 5ef738ba5b..e9de9c5bcb 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -106,8 +106,9 @@ Status VUnionNode::get_next_pass_through(RuntimeState*
state, Block* block) {
_child_eos = false;
}
DCHECK_EQ(block->rows(), 0);
- RETURN_IF_ERROR_AND_CHECK_SPAN(child(_child_idx)->get_next(state, block,
&_child_eos),
- child(_child_idx)->get_next_span(),
_child_eos);
+ RETURN_IF_ERROR_AND_CHECK_SPAN(
+ child(_child_idx)->get_next_after_projects(state, block,
&_child_eos),
+ child(_child_idx)->get_next_span(), _child_eos);
if (_child_eos) {
// Even though the child is at eos, it's not OK to close() it here.
Once we close
// the child, the row batches that it produced are invalid. Marking
the batch as
@@ -148,7 +149,7 @@ Status VUnionNode::get_next_materialized(RuntimeState*
state, Block* block) {
child_block.clear();
// The first batch from each child is always fetched here.
RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(_child_idx)->get_next(state, &child_block, &_child_eos),
+ child(_child_idx)->get_next_after_projects(state,
&child_block, &_child_eos),
child(_child_idx)->get_next_span(), _child_eos);
SCOPED_TIMER(_materialize_exprs_evaluate_timer);
if (child_block.rows() > 0) {
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 6d0de17dac..60a4ad5ce3 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -227,15 +227,15 @@ Status VExpr::create_expr_trees(ObjectPool* pool, const
std::vector<doris::TExpr
Status VExpr::prepare(const std::vector<VExprContext*>& ctxs, RuntimeState*
state,
const RowDescriptor& row_desc) {
- for (int i = 0; i < ctxs.size(); ++i) {
- RETURN_IF_ERROR(ctxs[i]->prepare(state, row_desc));
+ for (auto ctx : ctxs) {
+ RETURN_IF_ERROR(ctx->prepare(state, row_desc));
}
return Status::OK();
}
void VExpr::close(const std::vector<VExprContext*>& ctxs, RuntimeState* state)
{
- for (int i = 0; i < ctxs.size(); ++i) {
- ctxs[i]->close(state);
+ for (auto ctx : ctxs) {
+ ctx->close(state);
}
}
diff --git a/be/src/vec/functions/like.cpp b/be/src/vec/functions/like.cpp
index 306aa19a7e..86db32c0dd 100644
--- a/be/src/vec/functions/like.cpp
+++ b/be/src/vec/functions/like.cpp
@@ -135,15 +135,14 @@ Status FunctionLikeBase::regexp_fn(LikeSearchState*
state, const StringValue& va
Status FunctionLikeBase::hs_prepare(FunctionContext* context, const char*
expression,
hs_database_t** database, hs_scratch_t**
scratch) {
hs_compile_error_t* compile_err;
-
- if (hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY,
HS_MODE_BLOCK, NULL, database,
- &compile_err) != HS_SUCCESS) {
+ auto res = hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY,
HS_MODE_BLOCK, NULL,
+ database, &compile_err);
+ if (res != HS_SUCCESS) {
*database = nullptr;
if (context) context->set_error("hs_compile regex pattern error");
- auto status = Status::RuntimeError("hs_compile regex pattern error:" +
- std::string(compile_err->message));
+ return Status::RuntimeError("hs_compile regex pattern error:" +
+ std::string(compile_err->message));
hs_free_compile_error(compile_err);
- return status;
}
hs_free_compile_error(compile_err);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 22eae97c1d..a3fb63f0e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -338,6 +338,7 @@ public class SlotDescriptor {
builder.append(prefix).append("byteOffset=").append(byteOffset).append("\n");
builder.append(prefix).append("nullIndicatorByte=").append(nullIndicatorByte).append("\n");
builder.append(prefix).append("nullIndicatorBit=").append(nullIndicatorBit).append("\n");
+
builder.append(prefix).append("nullable=").append(isNullable).append("\n");
builder.append(prefix).append("slotIdx=").append(slotIdx).append("\n");
return builder.toString();
}
@@ -345,4 +346,9 @@ public class SlotDescriptor {
public boolean isScanSlot() {
return parent.getTable() instanceof OlapTable;
}
+
+ public void setMaterialized(boolean materialized) {
+ isMaterialized = materialized;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
index 5a40a8cb0b..43d9975e43 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java
@@ -258,9 +258,10 @@ public class ExpressionTranslator extends
DefaultExpressionVisitor<Expr, PlanTra
@Override
public Expr visitBinaryArithmetic(BinaryArithmetic binaryArithmetic,
PlanTranslatorContext context) {
- return new ArithmeticExpr(binaryArithmetic.getLegacyOperator(),
+ ArithmeticExpr arithmeticExpr = new
ArithmeticExpr(binaryArithmetic.getLegacyOperator(),
binaryArithmetic.child(0).accept(this, context),
binaryArithmetic.child(1).accept(this, context));
+ return arithmeticExpr;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 4090a09e5b..890b2e2cbf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.BaseTableRef;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TableName;
@@ -41,7 +42,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.AggregateFunction;
import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
@@ -126,24 +126,16 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (rootFragment.isPartitioned() &&
rootFragment.getPlanRoot().getNumInstances() > 1) {
rootFragment = exchangeToMergeFragment(rootFragment, context);
}
- // TODO: trick here, we need push project down
- if (physicalPlan.getType() == PlanType.PHYSICAL_PROJECT) {
- PhysicalProject<Plan> physicalProject = (PhysicalProject<Plan>)
physicalPlan;
- List<Expr> outputExprs = physicalProject.getProjects().stream()
- .map(e -> ExpressionTranslator.translate(e, context))
- .collect(Collectors.toList());
- rootFragment.setOutputExprs(outputExprs);
- } else {
- List<Expr> outputExprs = Lists.newArrayList();
- physicalPlan.getOutput().stream().map(Slot::getExprId)
- .forEach(exprId ->
outputExprs.add(context.findSlotRef(exprId)));
- rootFragment.setOutputExprs(outputExprs);
- }
+ List<Expr> outputExprs = Lists.newArrayList();
+ physicalPlan.getOutput().stream().map(Slot::getExprId)
+ .forEach(exprId ->
outputExprs.add(context.findSlotRef(exprId)));
+ rootFragment.setOutputExprs(outputExprs);
rootFragment.getPlanRoot().convertToVectoriezd();
for (PlanFragment fragment : context.getPlanFragments()) {
fragment.finalize(null);
}
Collections.reverse(context.getPlanFragments());
+ context.getDescTable().computeMemLayout();
return rootFragment;
}
@@ -240,10 +232,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// Create OlapScanNode
List<Slot> slotList = olapScan.getOutput();
OlapTable olapTable = olapScan.getTable();
- List<Expr> execConjunctsList = olapScan
- .getExpressions()
- .stream()
- .map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList,
olapTable, context);
tupleDescriptor.setTable(olapTable);
OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(),
tupleDescriptor, olapTable.getName());
@@ -259,12 +247,13 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
throw new AnalysisException(e.getMessage());
}
Utils.execWithUncheckedException(olapScanNode::init);
- olapScanNode.addConjuncts(execConjunctsList);
context.addScanNode(olapScanNode);
// Create PlanFragment
// TODO: add data partition after we have physical properties
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
olapScanNode, DataPartition.RANDOM);
context.addPlanFragment(planFragment);
+ // TODO: Nereids support duplicate table only for now, remove this
when support aggregate/unique table.
+ olapScanNode.setIsPreAggregation(true, "Nereids support duplicate
table only for now");
return planFragment;
}
@@ -437,9 +426,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
}
- // TODO: generate expression mapping when be project could do in ExecNode
+ // TODO: generate expression mapping when be project could do in ExecNode.
@Override
- public PlanFragment visitPhysicalProject(PhysicalProject<Plan> project,
PlanTranslatorContext context) {
+ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan>
project, PlanTranslatorContext context) {
PlanFragment inputFragment = project.child(0).accept(this, context);
// TODO: handle p.child(0) is not NamedExpression.
@@ -453,28 +442,66 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
// TODO: fix the project alias of an aliased relation.
+ List<Slot> slotList = project.getOutput();
+ TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null,
context);
PlanNode inputPlanNode = inputFragment.getPlanRoot();
+ // For hash join node, use vSrcToOutputSMap to describe the expression
calculation, use
+ // vIntermediateTupleDescList as input, and set vOutputTupleDesc as
the final output.
+ // TODO: HashJoinNode's be implementation is not support projection
yet, remove this after when supported.
+ if (inputPlanNode instanceof HashJoinNode) {
+ HashJoinNode hashJoinNode = (HashJoinNode) inputPlanNode;
+ hashJoinNode.setvOutputTupleDesc(tupleDescriptor);
+ hashJoinNode.setvSrcToOutputSMap(execExprList);
+ return inputFragment;
+ }
+ inputPlanNode.setProjectList(execExprList);
+ inputPlanNode.setOutputTupleDesc(tupleDescriptor);
+
List<Expr> predicateList = inputPlanNode.getConjuncts();
Set<Integer> requiredSlotIdList = new HashSet<>();
for (Expr expr : predicateList) {
extractExecSlot(expr, requiredSlotIdList);
}
for (Expr expr : execExprList) {
- if (expr instanceof SlotRef) {
- requiredSlotIdList.add(((SlotRef)
expr).getDesc().getId().asInt());
- }
+ extractExecSlot(expr, requiredSlotIdList);
+ }
+ if (inputPlanNode instanceof OlapScanNode) {
+ updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdList,
context);
}
return inputFragment;
}
+ private void updateChildSlotsMaterialization(PlanNode execPlan,
+ Set<Integer> requiredSlotIdList,
+ PlanTranslatorContext context) {
+ Set<SlotRef> slotRefSet = new HashSet<>();
+ for (Expr expr : execPlan.getConjuncts()) {
+ expr.collect(SlotRef.class, slotRefSet);
+ }
+ Set<Integer> slotIdSet = slotRefSet.stream()
+
.map(SlotRef::getSlotId).map(SlotId::asInt).collect(Collectors.toSet());
+ slotIdSet.addAll(requiredSlotIdList);
+ execPlan.getTupleIds().stream()
+ .map(context::getTupleDesc)
+ .map(TupleDescriptor::getSlots)
+ .flatMap(List::stream)
+ .forEach(s ->
s.setIsMaterialized(slotIdSet.contains(s.getId().asInt())));
+ }
+
@Override
public PlanFragment visitPhysicalFilter(PhysicalFilter<Plan> filter,
PlanTranslatorContext context) {
PlanFragment inputFragment = filter.child(0).accept(this, context);
PlanNode planNode = inputFragment.getPlanRoot();
+ addConjunctsToPlanNode(filter, planNode, context);
+ return inputFragment;
+ }
+
+ private void addConjunctsToPlanNode(PhysicalFilter<Plan> filter,
+ PlanNode planNode,
+ PlanTranslatorContext context) {
Expression expression = filter.getPredicates();
List<Expression> expressionList =
ExpressionUtils.extractConjunction(expression);
expressionList.stream().map(e -> ExpressionTranslator.translate(e,
context)).forEach(planNode::addConjunct);
- return inputFragment;
}
@Override
@@ -600,4 +627,5 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.addPlanFragment(fragment);
return fragment;
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 806bc1475c..c412bedabe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -22,10 +22,10 @@ import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.trees.expressions.ExprId;
-import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
@@ -122,17 +122,10 @@ public class PlanTranslatorContext {
slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
slotDescriptor.setIsMaterialized(true);
this.addExprIdSlotRefPair(slotReference.getExprId(), new
SlotRef(slotDescriptor));
+ slotDescriptor.setIsNullable(slotReference.nullable());
return slotDescriptor;
}
- /**
- * Create slotDesc with Expression.
- */
- public void createSlotDesc(TupleDescriptor tupleDesc, Expression
expression) {
- SlotDescriptor slotDescriptor = this.addSlotDesc(tupleDesc);
- slotDescriptor.setType(expression.getDataType().toCatalogDataType());
- }
-
/**
* in Nereids, all node only has one TupleDescriptor, so we can use the
first one.
*
@@ -144,6 +137,10 @@ public class PlanTranslatorContext {
return descTable.getTupleDesc(planNode.getOutputTupleIds().get(0));
}
+ public TupleDescriptor getTupleDesc(TupleId tupleId) {
+ return descTable.getTupleDesc(tupleId);
+ }
+
public DescriptorTable getDescTable() {
return descTable;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
index 856773028d..6531376177 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobType.java
@@ -29,6 +29,6 @@ public enum JobType {
APPLY_RULE,
DERIVE_STATS,
TOP_DOWN_REWRITE,
- BOTTOM_UP_REWRITE,
+ BOTTOM_UP_REWRITE
;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
index 8a3e5b4c83..274929db75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.Job;
import
org.apache.doris.nereids.rules.expression.rewrite.ExpressionNormalization;
import org.apache.doris.nereids.rules.rewrite.AggregateDisassemble;
+import org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.logical.FindHashConditionForJoin;
import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters;
import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits;
@@ -50,6 +51,7 @@ public class RewriteJob extends BatchRulesJob {
.add(topDownBatch(ImmutableList.of(new
FindHashConditionForJoin())))
.add(topDownBatch(ImmutableList.of(new
PushPredicateThroughJoin())))
.add(topDownBatch(ImmutableList.of(new
AggregateDisassemble())))
+ .add(topDownBatch(ImmutableList.of(new ColumnPruning())))
.add(topDownBatch(ImmutableList.of(new
SwapFilterAndProject())))
.add(bottomUpBatch(ImmutableList.of(new
MergeConsecutiveProjects())))
.add(topDownBatch(ImmutableList.of(new
MergeConsecutiveFilters())))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index 6b03b68182..286dc12eff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -172,7 +172,7 @@ public class StatsCalculator extends
DefaultPlanVisitor<StatsDeriveResult, Void>
// TODO: We should subtract those pruned column, and consider the
expression transformations in the node.
@Override
- public StatsDeriveResult visitPhysicalProject(PhysicalProject<Plan>
project, Void context) {
+ public StatsDeriveResult visitPhysicalProject(PhysicalProject<? extends
Plan> project, Void context) {
return computeProject(project);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index 818ef3796e..173b363799 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
@@ -45,8 +43,6 @@ public abstract class Expression extends
AbstractTreeNode<Expression> {
private static final String INPUT_CHECK_ERROR_MESSAGE = "argument %d
requires %s type, however '%s' is of %s type";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
public Expression(Expression... children) {
super(children);
}
@@ -147,23 +143,4 @@ public abstract class Expression extends
AbstractTreeNode<Expression> {
return 0;
}
- /**
- * Return true if all the SlotRef in the expr tree is bound to the same
column.
- */
- public boolean boundToColumn(String column) {
- for (Expression child : children) {
- if (!child.boundToColumn(column)) {
- return false;
- }
- }
- return true;
- }
-
- public Expression leftMostNode() {
- Expression leftChild = this;
- while (leftChild.children.size() > 0) {
- leftChild = leftChild.child(0);
- }
- return leftChild;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
index e8e04e901a..24a601a2e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
@@ -155,8 +155,4 @@ public class SlotReference extends Slot {
return new SlotReference(exprId, name, dataType, nullable, qualifiers);
}
- @Override
- public boolean boundToColumn(String name) {
- return this.name.equals(name);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
index 5c7564b50d..06527a1b78 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.AbstractTreeNode;
+import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.statistics.StatsDeriveResult;
import org.apache.commons.lang3.StringUtils;
@@ -119,4 +120,15 @@ public abstract class AbstractPlan extends
AbstractTreeNode<Plan> implements Pla
public int hashCode() {
return Objects.hash(statsDeriveResult, logicalProperties);
}
+
+ @Override
+ public List<Slot> getOutput() {
+ return logicalProperties.getOutput();
+ }
+
+ @Override
+ public Plan child(int index) {
+ return super.child(index);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
index eaf5c56b44..cfc8d1daf2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java
@@ -98,4 +98,5 @@ public class GroupPlan extends LogicalLeaf {
public String toString() {
return "GroupPlan( " + group.getGroupId() + " )";
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
index a6440252de..68cea493dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
@@ -49,10 +49,6 @@ public interface Plan extends TreeNode<Plan> {
List<Slot> getOutput();
- default List<Slot> computeOutput(Plan... inputs) {
- throw new IllegalStateException("Not support compute output for " +
getClass().getName());
- }
-
String treeString();
default Plan withOutput(List<Slot> output) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 4b2349fbca..a14797ec88 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -20,12 +20,10 @@ package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
-import java.util.List;
import java.util.Optional;
/**
@@ -59,11 +57,6 @@ public abstract class AbstractPhysicalPlan extends
AbstractPlan implements Physi
this.physicalProperties = PhysicalProperties.ANY;
}
- @Override
- public List<Slot> getOutput() {
- return logicalProperties.getOutput();
- }
-
@Override
public LogicalProperties getLogicalProperties() {
return logicalProperties;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index 6d75bf5819..cc5d914c1a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -180,7 +180,7 @@ public abstract class PlanVisitor<R, C> {
return visit(nestedLoopJoin, context);
}
- public R visitPhysicalProject(PhysicalProject<Plan> project, C context) {
+ public R visitPhysicalProject(PhysicalProject<? extends Plan> project, C
context) {
return visit(project, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index ad2cc2f5fc..5cae77c483 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -23,6 +23,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
@@ -83,6 +84,7 @@ public class ExchangeNode extends PlanNode {
limit = inputNode.limit;
}
computeTupleIds();
+
}
public boolean isMergingExchange() {
@@ -94,8 +96,15 @@ public class ExchangeNode extends PlanNode {
@Override
public final void computeTupleIds() {
- clearTupleIds();
- tupleIds.addAll(getChild(0).getTupleIds());
+ PlanNode inputNode = getChild(0);
+ TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+ if (outputTupleDesc != null) {
+ tupleIds.clear();
+ tupleIds.add(outputTupleDesc.getId());
+ } else {
+ clearTupleIds();
+ tupleIds.addAll(getChild(0).getTupleIds());
+ }
tblRefIds.addAll(getChild(0).getTblRefIds());
nullableTupleIds.addAll(getChild(0).getNullableTupleIds());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 9d0a13e24d..46e35775da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -1080,13 +1080,17 @@ public class HashJoinNode extends PlanNode {
}
if (vSrcToOutputSMap != null) {
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
+ // TODO: Enable it after we support new optimizers
+ // if
(ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
+ //
msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
+ // } else
msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
-
msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
}
}
if (vOutputTupleDesc != null) {
msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt());
- msg.setOutputTupleId(vOutputTupleDesc.getId().asInt());
+ // TODO Enable it after we support new optimizers
+ // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt());
}
if (vIntermediateTupleDescList != null) {
for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList)
{
@@ -1257,4 +1261,25 @@ public class HashJoinNode extends PlanNode {
}
return true;
}
+
+ /**
+ * Used by nereids.
+ */
+ public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) {
+ this.vOutputTupleDesc = vOutputTupleDesc;
+ }
+
+ /**
+ * Used by nereids.
+ */
+ public void setvIntermediateTupleDescList(List<TupleDescriptor>
vIntermediateTupleDescList) {
+ this.vIntermediateTupleDescList = vIntermediateTupleDescList;
+ }
+
+ /**
+ * Used by nereids.
+ */
+ public void setvSrcToOutputSMap(List<Expr> lhs) {
+ this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs,
Collections.emptyList());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 967895ee4e..3c82be05f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -319,7 +319,6 @@ public class OlapScanNode extends ScanNode {
}
situation = "The key type of table is aggregated.";
update = false;
- break CHECK;
} // CHECKSTYLE IGNORE THIS LINE
if (update) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index c25282da83..596897c947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -49,8 +49,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
@@ -74,7 +72,6 @@ import java.util.Set;
* its children (= are bound by tupleIds).
*/
public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats
{
- private static final Logger LOG = LogManager.getLogger(PlanNode.class);
protected String planNodeName;
@@ -142,6 +139,10 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
protected StatisticalType statisticalType = StatisticalType.DEFAULT;
protected StatsDeriveResult statsDeriveResult;
+ protected TupleDescriptor outputTupleDesc;
+
+ protected List<Expr> projectList;
+
protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String
planNodeName,
StatisticalType statisticalType) {
this.id = id;
@@ -550,6 +551,14 @@ public abstract class PlanNode extends TreeNode<PlanNode>
implements PlanStats {
}
toThrift(msg);
container.addToNodes(msg);
+ if (projectList != null) {
+ for (Expr expr : projectList) {
+ msg.addToProjections(expr.treeToThrift());
+ }
+ }
+ if (outputTupleDesc != null) {
+ msg.setOutputTupleId(outputTupleDesc.getId().asInt());
+ }
if (this instanceof ExchangeNode) {
msg.num_children = 0;
return;
@@ -1009,4 +1018,20 @@ public abstract class PlanNode extends
TreeNode<PlanNode> implements PlanStats {
public void finalizeForNereids() {
}
+
+ public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
+ this.outputTupleDesc = outputTupleDesc;
+ }
+
+ public TupleDescriptor getOutputTupleDesc() {
+ return outputTupleDesc;
+ }
+
+ public void setProjectList(List<Expr> projectList) {
+ this.projectList = projectList;
+ }
+
+ public List<Expr> getProjectList() {
+ return projectList;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b93f7f15c0..58e4a02435 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1144,7 +1144,7 @@ public class SingleNodePlanner {
MaterializedViewSelector.BestIndexInfo bestIndexInfo
= materializedViewSelector.selectBestMV(olapScanNode);
if (bestIndexInfo == null) {
- selectFailed |= true;
+ selectFailed = true;
TupleId tupleId = olapScanNode.getTupleId();
selectStmt.updateDisableTuplesMVRewriter(tupleId);
LOG.debug("MV rewriter of tuple [] will be disable",
tupleId);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
new file mode 100644
index 0000000000..8762ce125b
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.glue.translator;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+
+import mockit.Injectable;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class PhysicalPlanTranslatorTest {
+
+ @Test
+ public void testOlapPrune(@Mocked OlapTable t1, @Injectable
LogicalProperties placeHolder) throws Exception {
+ List<String> qualifierList = new ArrayList<>();
+ qualifierList.add("test");
+ qualifierList.add("t1");
+ List<Slot> t1Output = new ArrayList<>();
+ SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
+ SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
+ SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE);
+ t1Output.add(col1);
+ t1Output.add(col2);
+ t1Output.add(col3);
+ LogicalProperties t1Properties = new LogicalProperties(() -> t1Output);
+ PhysicalOlapScan scan = new PhysicalOlapScan(t1, qualifierList, 0L,
+ Collections.emptyList(), Collections.emptyList(), null,
+ Optional.empty(),
+ t1Properties);
+ Literal t1FilterRight = new IntegerLiteral(1);
+ Expression t1FilterExpr = new GreaterThan(col1, t1FilterRight);
+ PhysicalFilter<PhysicalOlapScan> filter =
+ new PhysicalFilter(t1FilterExpr, placeHolder, scan);
+ List<NamedExpression> projList = new ArrayList<>();
+ projList.add(col2);
+ PhysicalProject<PhysicalFilter> project = new PhysicalProject(projList,
+ placeHolder, filter);
+ PlanTranslatorContext planTranslatorContext = new
PlanTranslatorContext();
+ PhysicalPlanTranslator translator = new PhysicalPlanTranslator();
+ PlanFragment fragment = translator.visitPhysicalProject(project,
planTranslatorContext);
+ PlanNode planNode = fragment.getPlanRoot();
+ List<OlapScanNode> scanNodeList = new ArrayList<>();
+ planNode.collect(OlapScanNode.class::isInstance, scanNodeList);
+ Assertions.assertEquals(2,
scanNodeList.get(0).getTupleDesc().getMaterializedSlots().size());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
index 2a4a9343e6..67f6bc43ea 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java
@@ -40,14 +40,14 @@ public class PlanRewriter {
public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext
connectContext, RuleFactory... rules) {
return new Memo(plan)
.newCascadesContext(new StatementContext(connectContext, new
OriginStatement("", 0)))
- .topDownRewrite(rules)
+ .bottomUpRewrite(rules)
.getMemo();
}
public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext
connectContext, Rule... rules) {
return new Memo(plan)
.newCascadesContext(new StatementContext(connectContext, new
OriginStatement("", 0)))
- .topDownRewrite(rules)
+ .bottomUpRewrite(rules)
.getMemo();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
index bd3d1f3866..2fe0085883 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
@@ -90,6 +90,8 @@ public class DistributedPlannerTest {
Deencapsulation.setField(inputPlanRoot, "conjuncts",
Lists.newArrayList());
new Expectations() {
{
+ inputPlanRoot.getOutputTupleDesc();
+ result = null;
inputFragment.isPartitioned();
result = true;
plannerContext.getNextNodeId();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]