This is an automated email from the ASF dual-hosted git repository. prozsa pushed a commit to branch branch-4.5.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8058e16ed7cfe6092847be906ac635effce58e4d Author: Daniel Becker <daniel.bec...@cloudera.com> AuthorDate: Sat Feb 22 19:58:54 2025 +0100 IMPALA-13770 (Addendum): Close expressions in IcebergMergeCasePlan IMPALA-13770 added code to call Close() on IcebergMergeCase::{output_exprs_,filter_conjuncts_}. However, these expressions are created by IcebergMergeCasePlan, with pointers to the expressions copied to possibly multiple IcebergMergeCase objects. Therefore, although it does not cause errors in practice, it is better to close the expressions in IcebergMergeCasePlan. This change adds a Close() method to IcebergMergeCasePlan that closes these expressions. This patch also calls Close() on IcebergMergeSinkConfig::merge_action_ and IcebergMergeSink::merge_action_evaluator_, which were not closed previously. Change-Id: Iefa998dea173051702ef08c03b489178a17a653f Reviewed-on: http://gerrit.cloudera.org:8080/22522 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/iceberg-merge-node.cc | 14 ++++++++++++-- be/src/exec/iceberg-merge-node.h | 2 ++ be/src/exec/iceberg-merge-sink.cc | 2 ++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index 0c3409bc9..bdb1b45a4 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -74,12 +74,24 @@ Status IcebergMergeCasePlan::Init(const TIcebergMergeCase& tmerge_case, return Status::OK(); } +void IcebergMergeCasePlan::Close() { + ScalarExpr::Close(output_exprs_); + ScalarExpr::Close(filter_conjuncts_); +} + Status IcebergMergePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const { ObjectPool* pool = state->obj_pool(); *node = pool->Add(new IcebergMergeNode(pool, *this, state->desc_tbl())); return Status::OK(); } +void IcebergMergePlanNode::Close() { + for (IcebergMergeCasePlan* merge_case_plan : merge_case_plans_) { + merge_case_plan->Close(); + } + PlanNode::Close(); +} + IcebergMergeNode::IcebergMergeNode( ObjectPool* pool, const IcebergMergePlanNode& pnode, const DescriptorTbl& descs) : ExecNode(pool, pnode, descs), @@ -331,8 +343,6 @@ Status IcebergMergeCase::Open(RuntimeState* state) { void IcebergMergeCase::Close(RuntimeState* state) { ScalarExprEvaluator::Close(filter_evaluators_, state); ScalarExprEvaluator::Close(output_evaluators_, state); - ScalarExpr::Close(output_exprs_); - ScalarExpr::Close(filter_conjuncts_); } } // namespace impala diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h index 59c3ffbdd..be08ac698 100644 --- a/be/src/exec/iceberg-merge-node.h +++ b/be/src/exec/iceberg-merge-node.h @@ -44,6 +44,7 @@ class IcebergMergePlanNode : public PlanNode { public: Status Init(const TPlanNode& tnode, FragmentState* state) override; Status CreateExecNode(RuntimeState* state, ExecNode** node) const override; + virtual void Close() override; IcebergMergePlanNode() = default; ~IcebergMergePlanNode() override = default; @@ -143,6 +144,7 @@ class IcebergMergeCasePlan { ~IcebergMergeCasePlan() = default; Status Init(const TIcebergMergeCase& tmerge_case, FragmentState* state, const RowDescriptor* row_desc); + void Close(); IcebergMergeCasePlan(const IcebergMergeCasePlan& other) = delete; IcebergMergeCasePlan(IcebergMergeCasePlan&& other) = delete; diff --git a/be/src/exec/iceberg-merge-sink.cc b/be/src/exec/iceberg-merge-sink.cc index 0cf6f224c..38275f01c 100644 --- a/be/src/exec/iceberg-merge-sink.cc +++ b/be/src/exec/iceberg-merge-sink.cc @@ -68,6 +68,7 @@ DataSink* IcebergMergeSinkConfig::CreateSink(RuntimeState* state) const { void IcebergMergeSinkConfig::Close() { delete_sink_config_->Close(); insert_sink_config_->Close(); + merge_action_->Close(); DataSinkConfig::Close(); } @@ -139,6 +140,7 @@ Status IcebergMergeSink::FlushFinal(RuntimeState* state) { void IcebergMergeSink::Close(RuntimeState* state) { insert_sink_->Close(state); delete_sink_->Close(state); + merge_action_evaluator_->Close(state); DataSink::Close(state); DCHECK(closed_); }