This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 8deb0a436b1 branch-4.0: [refine](expr)Use the new execute function in
the expr context. #58241 (#58488)
8deb0a436b1 is described below
commit 8deb0a436b1f452099b4b33de963bda532a535e4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 28 21:51:15 2025 +0800
branch-4.0: [refine](expr)Use the new execute function in the expr context.
#58241 (#58488)
Cherry-picked from #58241
Co-authored-by: Mryange <[email protected]>
---
be/src/olap/schema_change.cpp | 3 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 13 ++---
be/src/vec/exprs/vexpr_context.cpp | 64 ++++++++++++----------
be/src/vec/exprs/vexpr_context.h | 13 +++--
4 files changed, 47 insertions(+), 46 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index c463e92c0c4..96552711266 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -327,8 +327,7 @@ Status BlockChanger::change_block(vectorized::Block*
ref_block,
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(ctx.get(), ref_block,
ref_block->columns()));
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(ctx.get(),
ref_block));
}
const int row_num = cast_set<int>(ref_block->rows());
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index f74fe7d2fec..37b6b898b5c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -602,15 +602,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
if (filter_map.has_filter()) {
- if (block->columns() == origin_column_num) {
- // the whole row group has been filtered by
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
- // generated from next batch, so the filter column is removed
ahead.
- DCHECK_EQ(block->rows(), 0);
- } else {
- RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
- block, _lazy_read_ctx.all_predicate_col_ids,
result_filter));
- Block::erase_useless_column(block, origin_column_num);
- }
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
+ block, _lazy_read_ctx.all_predicate_col_ids,
result_filter));
+ Block::erase_useless_column(block, origin_column_num);
+
} else {
Block::erase_useless_column(block, origin_column_num);
}
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index 38b83a1467f..2b1bf6909b1 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -31,6 +31,7 @@
#include "runtime/thread_context.h"
#include "udf/udf.h"
#include "util/simd/bits.h"
+#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
@@ -73,12 +74,20 @@ Status VExprContext::execute(vectorized::Block* block, int*
result_column_id) {
return st;
}
-Status VExprContext::execute(Block* block, ColumnPtr& result_column) {
+Status VExprContext::execute(const Block* block, ColumnPtr& result_column) {
Status st;
RETURN_IF_CATCH_EXCEPTION({ st = _root->execute_column(this, block,
result_column); });
return st;
}
+DataTypePtr VExprContext::execute_type(const Block* block) {
+ return _root->execute_type(block);
+}
+
+[[nodiscard]] const std::string& VExprContext::expr_name() const {
+ return _root->expr_name();
+}
+
bool VExprContext::is_blockable() const {
return _root->is_blockable();
}
@@ -159,15 +168,16 @@ bool VExprContext::all_expr_inverted_index_evaluated() {
return _index_context->has_index_result_for_expr(_root.get());
}
-Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block,
size_t column_to_keep) {
+Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block) {
if (vexpr_ctx == nullptr || block->rows() == 0) {
return Status::OK();
}
- int result_column_id = -1;
- size_t origin_size = block->allocated_bytes();
- RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
- vexpr_ctx->_memory_usage = (block->allocated_bytes() - origin_size);
- return Block::filter_block(block, result_column_id, column_to_keep);
+ ColumnPtr filter_column;
+ RETURN_IF_ERROR(vexpr_ctx->execute(block, filter_column));
+ size_t filter_column_id = block->columns();
+ block->insert({filter_column, vexpr_ctx->execute_type(block),
"filter_column"});
+ vexpr_ctx->_memory_usage = filter_column->allocated_bytes();
+ return Block::filter_block(block, filter_column_id, filter_column_id);
}
Status VExprContext::filter_block(const VExprContextSPtrs& expr_contexts,
Block* block,
@@ -192,7 +202,7 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
// TODO: Performance Optimization
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
const std::vector<IColumn::Filter*>*
filters,
- bool accept_null, Block* block,
+ bool accept_null, const Block* block,
IColumn::Filter* result_filter, bool*
can_filter_all) {
size_t rows = block->rows();
DCHECK_EQ(result_filter->size(), rows);
@@ -201,9 +211,8 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
for (const auto& ctx : ctxs) {
// Statistics are only required when an rf wrapper exists in the expr.
bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
- int result_column_id = -1;
- RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
- ColumnPtr& filter_column =
block->get_by_position(result_column_id).column;
+ ColumnPtr filter_column;
+ RETURN_IF_ERROR(ctx->execute(block, filter_column));
if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
size_t column_size = nullable_column->size();
if (column_size == 0) {
@@ -297,7 +306,7 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
return Status::OK();
}
-Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts,
Block* block,
+Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts,
const Block* block,
ColumnUInt8& null_map, IColumn::Filter&
filter) {
const auto& rows = block->rows();
if (rows == 0) {
@@ -312,10 +321,9 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& conjuncts, Block
auto* final_filter_ptr = filter.data();
for (const auto& conjunct : conjuncts) {
- int result_column_id = -1;
- RETURN_IF_ERROR(conjunct->execute(block, &result_column_id));
- auto [filter_column, is_const] =
-
unpack_if_const(block->get_by_position(result_column_id).column);
+ ColumnPtr result_column;
+ RETURN_IF_ERROR(conjunct->execute(block, result_column));
+ auto [filter_column, is_const] = unpack_if_const(result_column);
const auto* nullable_column = assert_cast<const
ColumnNullable*>(filter_column.get());
if (!is_const) {
const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
@@ -426,24 +434,22 @@ Status VExprContext::get_output_block_after_execute_exprs(
const VExprContextSPtrs& output_vexpr_ctxs, const Block& input_block,
Block* output_block,
bool do_projection) {
auto rows = input_block.rows();
- vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
vectorized::ColumnsWithTypeAndName result_columns;
_reset_memory_usage(output_vexpr_ctxs);
for (const auto& vexpr_ctx : output_vexpr_ctxs) {
- int result_column_id = -1;
- int origin_columns = tmp_block.columns();
- size_t origin_usage = tmp_block.allocated_bytes();
- RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
- DCHECK(result_column_id != -1);
-
- vexpr_ctx->_memory_usage = tmp_block.allocated_bytes() - origin_usage;
- const auto& col = tmp_block.get_by_position(result_column_id);
- if (do_projection && origin_columns <= result_column_id) {
- result_columns.emplace_back(col.column->clone_resized(rows),
col.type, col.name);
- vexpr_ctx->_memory_usage +=
result_columns.back().column->allocated_bytes();
+ ColumnPtr result_column;
+ RETURN_IF_ERROR(vexpr_ctx->execute(&input_block, result_column));
+
+ auto type = vexpr_ctx->execute_type(&input_block);
+ const auto& name = vexpr_ctx->expr_name();
+
+ vexpr_ctx->_memory_usage += result_column->allocated_bytes();
+ if (do_projection) {
+ result_columns.emplace_back(result_column->clone_resized(rows),
type, name);
+
} else {
-
result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
+ result_columns.emplace_back(result_column, type, name);
}
}
*output_block = {result_columns};
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index bc57be586fc..5176e451c26 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -172,7 +172,9 @@ public:
[[nodiscard]] Status open(RuntimeState* state);
[[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
[[nodiscard]] Status execute(Block* block, int* result_column_id);
- [[nodiscard]] Status execute(Block* block, ColumnPtr& result_column);
+ [[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column);
+ [[nodiscard]] DataTypePtr execute_type(const Block* block);
+ [[nodiscard]] const std::string& expr_name() const;
[[nodiscard]] bool is_blockable() const;
VExprSPtr root() { return _root; }
@@ -208,20 +210,19 @@ public:
bool all_expr_inverted_index_evaluated();
- [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block*
block,
- size_t column_to_keep);
+ [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block*
block);
[[nodiscard]] static Status filter_block(const VExprContextSPtrs&
expr_contexts, Block* block,
size_t column_to_keep);
[[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs&
ctxs,
const
std::vector<IColumn::Filter*>* filters,
- bool accept_null, Block*
block,
+ bool accept_null, const
Block* block,
IColumn::Filter*
result_filter,
bool* can_filter_all);
- [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs&
conjuncts, Block* block,
- ColumnUInt8& null_map,
+ [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs&
conjuncts,
+ const Block* block,
ColumnUInt8& null_map,
IColumn::Filter&
result_filter);
static Status execute_conjuncts(const VExprContextSPtrs& ctxs,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]