This is an automated email from the ASF dual-hosted git repository.

prozsa pushed a commit to branch branch-4.5.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8058e16ed7cfe6092847be906ac635effce58e4d
Author: Daniel Becker <daniel.bec...@cloudera.com>
AuthorDate: Sat Feb 22 19:58:54 2025 +0100

    IMPALA-13770 (Addendum): Close expressions in IcebergMergeCasePlan
    
    IMPALA-13770 added code to call Close() on
    IcebergMergeCase::{output_exprs_,filter_conjuncts_}. However, these
    expressions are created by IcebergMergeCasePlan, with pointers to the
    expressions copied to possibly multiple IcebergMergeCase objects.
    Therefore, although it does not cause errors in practice, it is better
    to close the expressions in IcebergMergeCasePlan.
    
    This change adds a Close() method to IcebergMergeCasePlan that closes
    these expressions.
    
    This patch also calls Close() on IcebergMergeSinkConfig::merge_action_
    and IcebergMergeSink::merge_action_evaluator_, which were not closed
    previously.
    
    Change-Id: Iefa998dea173051702ef08c03b489178a17a653f
    Reviewed-on: http://gerrit.cloudera.org:8080/22522
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/exec/iceberg-merge-node.cc | 14 ++++++++++++--
 be/src/exec/iceberg-merge-node.h  |  2 ++
 be/src/exec/iceberg-merge-sink.cc |  2 ++
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/iceberg-merge-node.cc 
b/be/src/exec/iceberg-merge-node.cc
index 0c3409bc9..bdb1b45a4 100644
--- a/be/src/exec/iceberg-merge-node.cc
+++ b/be/src/exec/iceberg-merge-node.cc
@@ -74,12 +74,24 @@ Status IcebergMergeCasePlan::Init(const TIcebergMergeCase& 
tmerge_case,
   return Status::OK();
 }
 
+void IcebergMergeCasePlan::Close() {
+  ScalarExpr::Close(output_exprs_);
+  ScalarExpr::Close(filter_conjuncts_);
+}
+
 Status IcebergMergePlanNode::CreateExecNode(RuntimeState* state, ExecNode** 
node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(new IcebergMergeNode(pool, *this, state->desc_tbl()));
   return Status::OK();
 }
 
+void IcebergMergePlanNode::Close() {
+  for (IcebergMergeCasePlan* merge_case_plan : merge_case_plans_) {
+    merge_case_plan->Close();
+  }
+  PlanNode::Close();
+}
+
 IcebergMergeNode::IcebergMergeNode(
     ObjectPool* pool, const IcebergMergePlanNode& pnode, const DescriptorTbl& 
descs)
   : ExecNode(pool, pnode, descs),
@@ -331,8 +343,6 @@ Status IcebergMergeCase::Open(RuntimeState* state) {
 void IcebergMergeCase::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(filter_evaluators_, state);
   ScalarExprEvaluator::Close(output_evaluators_, state);
-  ScalarExpr::Close(output_exprs_);
-  ScalarExpr::Close(filter_conjuncts_);
 }
 
 } // namespace impala
diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h
index 59c3ffbdd..be08ac698 100644
--- a/be/src/exec/iceberg-merge-node.h
+++ b/be/src/exec/iceberg-merge-node.h
@@ -44,6 +44,7 @@ class IcebergMergePlanNode : public PlanNode {
  public:
   Status Init(const TPlanNode& tnode, FragmentState* state) override;
   Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  virtual void Close() override;
 
   IcebergMergePlanNode() = default;
   ~IcebergMergePlanNode() override = default;
@@ -143,6 +144,7 @@ class IcebergMergeCasePlan {
   ~IcebergMergeCasePlan() = default;
   Status Init(const TIcebergMergeCase& tmerge_case, FragmentState* state,
       const RowDescriptor* row_desc);
+  void Close();
 
   IcebergMergeCasePlan(const IcebergMergeCasePlan& other) = delete;
   IcebergMergeCasePlan(IcebergMergeCasePlan&& other) = delete;
diff --git a/be/src/exec/iceberg-merge-sink.cc 
b/be/src/exec/iceberg-merge-sink.cc
index 0cf6f224c..38275f01c 100644
--- a/be/src/exec/iceberg-merge-sink.cc
+++ b/be/src/exec/iceberg-merge-sink.cc
@@ -68,6 +68,7 @@ DataSink* IcebergMergeSinkConfig::CreateSink(RuntimeState* 
state) const {
 void IcebergMergeSinkConfig::Close() {
   delete_sink_config_->Close();
   insert_sink_config_->Close();
+  merge_action_->Close();
   DataSinkConfig::Close();
 }
 
@@ -139,6 +140,7 @@ Status IcebergMergeSink::FlushFinal(RuntimeState* state) {
 void IcebergMergeSink::Close(RuntimeState* state) {
   insert_sink_->Close(state);
   delete_sink_->Close(state);
+  merge_action_evaluator_->Close(state);
   DataSink::Close(state);
   DCHECK(closed_);
 }

Reply via email to