HappenLee commented on code in PR #64563:
URL: https://github.com/apache/doris/pull/64563#discussion_r3480952670
##########
be/src/exprs/lambda_function/varray_map_function.cpp:
##########
@@ -309,70 +345,124 @@ class ArrayMapFunction : public LambdaFunction {
result_column = ColumnNullable::create(
ColumnArray::create(ColumnNullable::create(std::move(result_col),
std::move(nested_null_map)),
- array_column_offset),
+ std::move(array_column_offset)),
std::move(outside_null_map));
}
} else {
if (res_type->is_nullable()) {
- result_column = ColumnArray::create(std::move(result_col),
array_column_offset);
+ result_column =
+ ColumnArray::create(std::move(result_col),
std::move(array_column_offset));
} else {
auto nested_null_map = ColumnUInt8::create(result_col->size(),
0);
result_column = ColumnArray::create(
ColumnNullable::create(std::move(result_col),
std::move(nested_null_map)),
- array_column_offset);
+ std::move(array_column_offset));
}
}
return Status::OK();
}
private:
- bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs,
int id) const {
- const auto it = std::find(output_slot_ref_indexs.begin(),
output_slot_ref_indexs.end(), id);
- return it != output_slot_ref_indexs.end();
+ struct LambdaArgumentBinding {
+ bool bind_by_name = true;
+ size_t argument_size = 0;
+ std::vector<std::string> names;
+ };
+
+ Status _prepare_lambda_argument_binding(const VExprSPtr& expr, size_t
expected_argument_size,
+ LambdaArgumentBinding&
argument_binding) const {
+ DORIS_CHECK_EQ(expr->node_type(), TExprNodeType::LAMBDA_FUNCTION_EXPR);
+ const auto* lambda_expr = assert_cast<const
VLambdaFunctionExpr*>(expr.get());
+
+ argument_binding.argument_size = 0;
+ argument_binding.names.clear();
+ argument_binding.bind_by_name = lambda_expr->has_argument_names();
+
+ if (!argument_binding.bind_by_name) {
+ if (_contains_nested_lambda_call(expr->get_child(0))) {
+ return Status::InternalError(
+ "Cannot resolve nested lambda argument without lambda
metadata");
+ }
+ argument_binding.argument_size = expected_argument_size;
+ argument_binding.names.resize(expected_argument_size);
+ return Status::OK();
+ }
+
+ argument_binding.names = lambda_expr->argument_names();
+ if (argument_binding.names.size() > expected_argument_size) {
+ return Status::InternalError(
+ "lambda argument metadata size exceeds parameter size,
maximum={}, actual={}",
+ expected_argument_size, argument_binding.names.size());
+ }
+ argument_binding.argument_size = argument_binding.names.size();
+ if (std::ranges::any_of(argument_binding.names,
+ [](const auto& argument_name) { return
argument_name.empty(); })) {
+ return Status::InternalError("lambda argument metadata contains
empty name");
+ }
+ return Status::OK();
}
- void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
- for (const auto& child : expr->children()) {
- if (child->is_column_ref()) {
- auto* ref = static_cast<VColumnRef*>(child.get());
- ref->set_gap(gap);
- } else {
- _set_column_ref_column_id(child, gap);
+ Status _set_legacy_lambda_argument_gap(const VExprSPtr& expr, int
lambda_argument_base,
+ size_t argument_size) const {
+ if (expr->is_column_ref()) {
+ auto* ref = static_cast<VColumnRef*>(expr.get());
+ if (ref->column_id() >= 0 && static_cast<size_t>(ref->column_id())
< argument_size) {
+ const int argument_index = ref->column_id();
+ ref->set_gap(lambda_argument_base + argument_index -
ref->column_id());
}
+ return Status::OK();
}
- }
- void _collect_slot_ref_column_id(VExprSPtr expr,
- std::vector<int>& output_slot_ref_indexs)
const {
for (const auto& child : expr->children()) {
- if (child->is_slot_ref()) {
- const auto* ref = static_cast<VSlotRef*>(child.get());
- output_slot_ref_indexs.push_back(ref->column_id());
- } else {
- _collect_slot_ref_column_id(child, output_slot_ref_indexs);
- }
+ RETURN_IF_ERROR(
+ _set_legacy_lambda_argument_gap(child,
lambda_argument_base, argument_size));
}
+ return Status::OK();
}
- void _extend_data(std::vector<MutableColumnPtr>& columns, const Block*
block,
- int current_repeat_times, int size, int64_t
current_row_idx,
- const std::vector<int>& output_slot_ref_indexs) const {
- if (!current_repeat_times || !size) {
+ bool _is_lambda_call_with_lambda_expr(const VExprSPtr& expr) const {
+ return expr->node_type() == TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR &&
+ !expr->children().empty() &&
+ expr->children()[0]->node_type() ==
TExprNodeType::LAMBDA_FUNCTION_EXPR;
+ }
+
+ bool _contains_nested_lambda_call(const VExprSPtr& expr) const {
+ if (_is_lambda_call_with_lambda_expr(expr)) {
+ return true;
+ }
+ return std::ranges::any_of(expr->children(), [this](const auto& child)
{
+ return _contains_nested_lambda_call(child);
+ });
+ }
+
+ void _repeat_input_columns(std::vector<MutableColumnPtr>& columns, const
Block* block,
+ int repeat_times,
+ const std::vector<bool>&
materialized_input_columns,
+ int64_t row_idx) const {
+ if (!repeat_times || materialized_input_columns.empty()) {
return;
}
- for (int i = 0; i < size; i++) {
- if (_contains_column_id(output_slot_ref_indexs, i)) {
- auto src_column =
-
block->get_by_position(i).column->convert_to_full_column_if_const();
- columns[i]->insert_many_from(*src_column, current_row_idx,
current_repeat_times);
- } else {
- // must be column const
- DCHECK(is_column_const(*columns[i]));
- columns[i]->resize(columns[i]->size() + current_repeat_times);
+ for (size_t i = 0; i < materialized_input_columns.size(); i++) {
+ if (!materialized_input_columns[i]) {
+ columns[i]->resize(columns[i]->size() + repeat_times);
+ continue;
}
+ DORIS_CHECK(block != nullptr);
+ auto src_column =
block->get_by_position(i).column->convert_to_full_column_if_const();
+ if (check_and_get_column<ColumnNothing>(src_column.get())) {
+ // A ColumnNothing in the outer block is a placeholder for an
unmaterialized
+ // virtual column. Keep it as a placeholder in the lambda
block as well, so
+ // VirtualSlotRef can still materialize it lazily if the
lambda body reads it.
+ if (!check_and_get_column<ColumnNothing>(columns[i].get())) {
+ columns[i] = ColumnNothing::create(columns[i]->size());
+ }
+ }
+ columns[i]->insert_many_from(*src_column, row_idx, repeat_times);
}
}
+
+ LambdaArgumentBinding _lambda_argument_binding;
Review Comment:
这种运行时状态不要放在function里的成员变量,应该反正function_context里面,否则容易有多线程的问题
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]