This is an automated email from the ASF dual-hosted git repository. prozsa pushed a commit to branch branch-4.5.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit d4eb03b38ef25211f12b9d4b725e460dbaa28bb9 Author: Daniel Becker <daniel.bec...@cloudera.com> AuthorDate: Thu Feb 20 09:21:05 2025 +0100 IMPALA-13770: Updating Iceberg tables with UDFs crashes Impala When using a native UDF in the target value of an UPDATE statement or in a filter predicate or target value of a MERGE statement, Impala crashes with the following DCHECK: be/src/exprs/expr.cc:47 47 DCHECK(cache_entry_ == nullptr); This DCHECK is in the destructor of Expr, and it fires because Close() has not been called for the expression. In the UPDATE case this is caused by MultiTableSinkConfig: it creates child DataSinkConfig objects but does not call Close() on them, and consequently these child sink configs do not call Close() on their output expressions. In the MERGE case it is because various expressions are not closed in IcebergMergeCase and IcebergMergeNode. This patch fixes the issue by overriding Close() in MultiTableSinkConfig, calling Close() on the child sinks as well as closing the expressions in IcebergMergeCase and IcebergMergeNode. Testing: - Added EE regression tests for the UPDATE and MERGE cases in iceberg-update-basic.test and iceberg-merge.test Change-Id: Id86638c8d6d86062c68cc9d708ec9c7b0a4e95eb Reviewed-on: http://gerrit.cloudera.org:8080/22508 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/iceberg-merge-node.cc | 7 +++++ be/src/exec/iceberg-merge-sink.cc | 8 +++++- be/src/exec/iceberg-merge-sink.h | 2 ++ be/src/exec/multi-table-sink.cc | 7 +++++ be/src/exec/multi-table-sink.h | 2 ++ .../queries/QueryTest/iceberg-merge.test | 31 +++++++++++++++++++++- .../queries/QueryTest/iceberg-update-basic.test | 25 +++++++++++++++++ tests/query_test/test_iceberg.py | 7 +++-- 8 files changed, 85 insertions(+), 4 deletions(-) diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index f591ead46..0c3409bc9 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -283,6 +283,11 @@ void IcebergMergeNode::Close(RuntimeState* state) { row_present_evaluator_->Close(state); ScalarExprEvaluator::Close(position_meta_evaluators_, state); ScalarExprEvaluator::Close(partition_meta_evaluators_, state); + + row_present_->Close(); + ScalarExpr::Close(position_meta_exprs_); + ScalarExpr::Close(partition_meta_exprs_); + ExecNode::Close(state); } @@ -326,6 +331,8 @@ Status IcebergMergeCase::Open(RuntimeState* state) { void IcebergMergeCase::Close(RuntimeState* state) { ScalarExprEvaluator::Close(filter_evaluators_, state); ScalarExprEvaluator::Close(output_evaluators_, state); + ScalarExpr::Close(output_exprs_); + ScalarExpr::Close(filter_conjuncts_); } } // namespace impala diff --git a/be/src/exec/iceberg-merge-sink.cc b/be/src/exec/iceberg-merge-sink.cc index 00ade9363..0cf6f224c 100644 --- a/be/src/exec/iceberg-merge-sink.cc +++ b/be/src/exec/iceberg-merge-sink.cc @@ -65,6 +65,12 @@ DataSink* IcebergMergeSinkConfig::CreateSink(RuntimeState* state) const { return state->obj_pool()->Add(new IcebergMergeSink(sink_id, *this, *tsink_, state)); } +void IcebergMergeSinkConfig::Close() { + delete_sink_config_->Close(); + insert_sink_config_->Close(); + DataSinkConfig::Close(); +} + IcebergMergeSink::IcebergMergeSink(TDataSinkId sink_id, const IcebergMergeSinkConfig& sink_config, const TDataSink& dsink, RuntimeState* state) @@ -137,4 +143,4 @@ void IcebergMergeSink::Close(RuntimeState* state) { DCHECK(closed_); } -} // namespace impala \ No newline at end of file +} // namespace impala diff --git a/be/src/exec/iceberg-merge-sink.h b/be/src/exec/iceberg-merge-sink.h index e5a3c2dd3..3863b3891 100644 --- a/be/src/exec/iceberg-merge-sink.h +++ b/be/src/exec/iceberg-merge-sink.h @@ -46,6 +46,8 @@ class IcebergMergeSinkConfig : public DataSinkConfig { ScalarExpr* merge_action() const { return merge_action_; } + virtual void Close() override; + protected: Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) override; diff --git a/be/src/exec/multi-table-sink.cc b/be/src/exec/multi-table-sink.cc index c37e9cf2d..81f158c25 100644 --- a/be/src/exec/multi-table-sink.cc +++ b/be/src/exec/multi-table-sink.cc @@ -44,6 +44,13 @@ DataSink* MultiTableSinkConfig::CreateSink(RuntimeState* state) const { new MultiTableSink(sink_id, *this, *tsink_, state)); } +void MultiTableSinkConfig::Close() { + for (TableSinkBaseConfig* table_sink_config : table_sink_configs_) { + table_sink_config->Close(); + } + DataSinkConfig::Close(); +} + MultiTableSink::MultiTableSink(TDataSinkId sink_id, const MultiTableSinkConfig& sink_config, const TDataSink& dsink, RuntimeState* state) : DataSink(sink_id, sink_config, "MultiTableSink", state) { diff --git a/be/src/exec/multi-table-sink.h b/be/src/exec/multi-table-sink.h index 05ca124f4..125663818 100644 --- a/be/src/exec/multi-table-sink.h +++ b/be/src/exec/multi-table-sink.h @@ -39,6 +39,8 @@ class MultiTableSinkConfig : public DataSinkConfig { return table_sink_configs_; } + virtual void Close() override; + ~MultiTableSinkConfig() override {} protected: diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test index 3e82301de..cd1a4f390 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test @@ -138,4 +138,33 @@ when not matched then insert * 6,true,6,6.599999904632568,60,2009-01-01,'6' ---- TYPES INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== \ No newline at end of file +==== + +---- QUERY +# Regression test for IMPALA-13770: using a native UDF as a filter predicate and target +# value of a MERGE statement. +# Note: even with the incorrect behaviour before IMPALA-13770 the test may pass and the +# values can also be updated successfully. This is because the crash happens after the +# query is closed, when releasing resources. This test is a valid regression test because +# even if it passes, there will be a crash: some later queries are likely to fail and +# there will be a minidump in the build artifacts, so the build will be marked FAILED. +create function if not exists identity(int) returns int location 'UDF_LOCATION' symbol='Identity'; +create table merge_with_udf(int_col INT) +stored by iceberg +tblproperties ('format-version'='2'); +insert into merge_with_udf values (1), (2); +==== +---- QUERY +merge into merge_with_udf t +using merge_with_udf s on s.int_col = t.int_col +when matched and s.int_col != identity(t.int_col) + 1 + then update set int_col = identity(s.int_col) +---- DML_RESULTS: merge_with_udf +1 +2 +---- TYPES +INT +---- RUNTIME_PROFILE +NumModifiedRows: 2 +NumDeletedRows: 2 +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test index f3e2c81c6..2bcf91cd8 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test @@ -404,3 +404,28 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day ---- TYPES STRING, STRING, STRING, STRING ==== + +---- QUERY +# Regression test for IMPALA-13770: using a native UDF in the target value of an UPDATE +# Note: even with the incorrect behaviour before IMPALA-13770 the test may pass and the +# values can also be updated successfully. This is because the crash happens after the +# query is closed, when releasing resources. This test is a valid regression test because +# even if it passes, there will be a crash: some later queries are likely to fail and +# there will be a minidump in the build artifacts, so the build will be marked FAILED. +create function if not exists identity(int) returns int location 'UDF_LOCATION' symbol='Identity'; +create table update_with_udf(int_col INT) +stored by iceberg +tblproperties ('format-version'='2'); +insert into update_with_udf values (1), (2); +==== +---- QUERY +update update_with_udf set int_col = identity(int_col); +---- DML_RESULTS: update_with_udf +1 +2 +---- TYPES +INT +---- RUNTIME_PROFILE +NumModifiedRows: 2 +NumDeletedRows: 2 +==== diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 0ba8214ba..46321ef0f 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1694,8 +1694,9 @@ class TestIcebergV2Table(IcebergTestSuite): assert hive_output == "id\n4\n5\n6\n7\n8\n" def test_update_basic(self, vector, unique_database): + udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') self.run_test_case('QueryTest/iceberg-update-basic', vector, - unique_database) + unique_database, test_file_vars={'UDF_LOCATION': udf_location}) self._test_update_basic_snapshots(unique_database) if IS_HDFS and self.should_run_for_hive(vector): self._update_basic_hive_tests(unique_database) @@ -1993,7 +1994,9 @@ class TestIcebergV2Table(IcebergTestSuite): self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True) def test_merge(self, vector, unique_database): - self.run_test_case('QueryTest/iceberg-merge', vector, unique_database) + udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') + self.run_test_case('QueryTest/iceberg-merge', vector, unique_database, + test_file_vars={'UDF_LOCATION': udf_location}) def test_merge_partition(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database)