This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 167ced78448d73cdb4b742a3ba43b0752302a167 Author: Peter Rozsa <[email protected]> AuthorDate: Mon Jan 20 11:05:08 2025 +0100 IMPALA-13674: Enable MERGE statement for Iceberg tables with equality deletes This change fixes the delete expression calculation for IcebergMergeImpl, when an Iceberg table contains equality deletes, the merge implementation now includes the data sequence number in the result expressions as the underlying tuple descriptor also includes it implicitly. Without including this field, the row evaluation fails because of the mismatching number of evaluators and slot descriptors. Tests: - manually validated on an Iceberg table that contains equality delete - e2e test added Change-Id: I60e48e2731a59520373dbb75104d75aae39a94c1 Reviewed-on: http://gerrit.cloudera.org:8080/22423 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/iceberg-merge-node.cc | 22 ++++----- be/src/exec/iceberg-merge-node.h | 10 ++-- common/thrift/PlanNodes.thrift | 2 +- .../apache/impala/analysis/IcebergMergeImpl.java | 57 +++++++++++----------- .../apache/impala/planner/IcebergMergeNode.java | 12 ++--- .../impala/analysis/AnalyzeModifyStmtsTest.java | 10 ++-- .../QueryTest/iceberg-merge-equality-insert.test | 23 +++++++++ .../QueryTest/iceberg-merge-equality-update.test | 16 ++++++ tests/common/file_utils.py | 15 +++--- tests/query_test/test_iceberg.py | 14 ++++++ tests/util/iceberg_metadata_util.py | 10 ++++ 11 files changed, 128 insertions(+), 63 deletions(-) diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index bdb1b45a4..93a2007d0 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -50,8 +50,8 @@ Status IcebergMergePlanNode::Init(const TPlanNode& tnode, FragmentState* state) RETURN_IF_ERROR(ScalarExpr::Create( tnode.merge_node.row_present, *row_descriptor_, state, pool, &row_present_)); - RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.position_meta_exprs, - *row_descriptor_, state, pool, &position_meta_exprs_)); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.delete_meta_exprs, + *row_descriptor_, state, pool, &delete_meta_exprs_)); RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.partition_meta_exprs, *row_descriptor_, state, pool, &partition_meta_exprs_)); @@ -99,7 +99,7 @@ IcebergMergeNode::IcebergMergeNode( child_row_idx_(0), child_eos_(false), row_present_(pnode.row_present_), - position_meta_exprs_(pnode.position_meta_exprs_), + delete_meta_exprs_(pnode.delete_meta_exprs_), partition_meta_exprs_(pnode.partition_meta_exprs_) { DCHECK(pnode.merge_action_tuple_id_ != -1); DCHECK(pnode.target_tuple_id_ != -1); @@ -132,8 +132,8 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) { expr_perm_pool_.get(), expr_results_pool_.get(), &row_present_evaluator_)); RETURN_IF_ERROR( - ScalarExprEvaluator::Create(position_meta_exprs_, state, state->obj_pool(), - expr_perm_pool_.get(), expr_results_pool_.get(), &position_meta_evaluators_)); + ScalarExprEvaluator::Create(delete_meta_exprs_, state, state->obj_pool(), + expr_perm_pool_.get(), expr_results_pool_.get(), &delete_meta_evaluators_)); RETURN_IF_ERROR( ScalarExprEvaluator::Create(partition_meta_exprs_, state, state->obj_pool(), @@ -155,7 +155,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) { new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); RETURN_IF_ERROR(row_present_evaluator_->Open(state)); - RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(delete_meta_evaluators_, state)); RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, state)); for (auto* merge_case : all_cases_) { @@ -293,18 +293,18 @@ void IcebergMergeNode::Close(RuntimeState* state) { merge_case->Close(state); } row_present_evaluator_->Close(state); - ScalarExprEvaluator::Close(position_meta_evaluators_, state); + ScalarExprEvaluator::Close(delete_meta_evaluators_, state); ScalarExprEvaluator::Close(partition_meta_evaluators_, state); row_present_->Close(); - ScalarExpr::Close(position_meta_exprs_); + ScalarExpr::Close(delete_meta_exprs_); ScalarExpr::Close(partition_meta_exprs_); ExecNode::Close(state); } -const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PositionMetaEvals() { - return position_meta_evaluators_; +const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::DeleteMetaEvals() { + return delete_meta_evaluators_; } const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PartitionMetaEvals() { @@ -328,7 +328,7 @@ Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& parent) combined_evaluators_.insert( combined_evaluators_.end(), output_evaluators_.begin(), output_evaluators_.end()); combined_evaluators_.insert(combined_evaluators_.end(), - parent.PositionMetaEvals().begin(), parent.PositionMetaEvals().end()); + parent.DeleteMetaEvals().begin(), parent.DeleteMetaEvals().end()); combined_evaluators_.insert(combined_evaluators_.end(), parent.PartitionMetaEvals().begin(), parent.PartitionMetaEvals().end()); return Status::OK(); diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h index be08ac698..bc2af5d14 100644 --- a/be/src/exec/iceberg-merge-node.h +++ b/be/src/exec/iceberg-merge-node.h @@ -61,8 +61,8 @@ class IcebergMergePlanNode : public PlanNode { /// target tuple, the source tuple, or both. ScalarExpr* row_present_ = nullptr; - /// Exprs used to identify the position of each target record - std::vector<ScalarExpr*> position_meta_exprs_; + /// Exprs used to identify the position/delete information of each target record + std::vector<ScalarExpr*> delete_meta_exprs_; /// Exprs used to identify the partitioning properties of a record std::vector<ScalarExpr*> partition_meta_exprs_; @@ -91,7 +91,7 @@ class IcebergMergeNode : public ExecNode { Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; Status Reset(RuntimeState* state, RowBatch* row_batch) override; void Close(RuntimeState* state) override; - const std::vector<ScalarExprEvaluator*>& PositionMetaEvals(); + const std::vector<ScalarExprEvaluator*>& DeleteMetaEvals(); const std::vector<ScalarExprEvaluator*>& PartitionMetaEvals(); private: @@ -119,9 +119,9 @@ class IcebergMergeNode : public ExecNode { int child_row_idx_; bool child_eos_; ScalarExpr* row_present_; - std::vector<ScalarExpr*> position_meta_exprs_; + std::vector<ScalarExpr*> delete_meta_exprs_; std::vector<ScalarExpr*> partition_meta_exprs_; - std::vector<ScalarExprEvaluator*> position_meta_evaluators_; + std::vector<ScalarExprEvaluator*> delete_meta_evaluators_; std::vector<ScalarExprEvaluator*> partition_meta_evaluators_; ScalarExprEvaluator* row_present_evaluator_; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 47872faf2..3760a6b9f 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -758,7 +758,7 @@ struct TIcebergMergeCase { struct TIcebergMergeNode { 1: required list<TIcebergMergeCase> cases 2: required Exprs.TExpr row_present - 3: required list<Exprs.TExpr> position_meta_exprs + 3: required list<Exprs.TExpr> delete_meta_exprs 4: required list<Exprs.TExpr> partition_meta_exprs 5: required Types.TTupleId merge_action_tuple_id 6: required Types.TTupleId target_tuple_id diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java index 6c6c00c45..536b57245 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java @@ -80,7 +80,7 @@ public class IcebergMergeImpl implements MergeImpl { private TupleId targetTupleId_; private List<Expr> targetExpressions_; - private List<Expr> targetPositionMetaExpressions_; + private List<Expr> targetDeleteMetaExpressions_; private List<Expr> targetPartitionMetaExpressions_; private List<Expr> targetPartitionExpressions_; private MergeSorting targetSorting_; @@ -94,7 +94,7 @@ public class IcebergMergeImpl implements MergeImpl { on_ = on; table_ = targetTableRef_.getTable(); targetExpressions_ = Lists.newArrayList(); - targetPositionMetaExpressions_ = Lists.newArrayList(); + targetDeleteMetaExpressions_ = Lists.newArrayList(); targetPartitionMetaExpressions_ = Lists.newArrayList(); targetPartitionExpressions_ = Lists.newArrayList(); } @@ -127,12 +127,6 @@ public class IcebergMergeImpl implements MergeImpl { "Unsupported '%s': '%s' for Iceberg table: %s", TableProperties.MERGE_MODE, modifyWriteMode, icebergTable_.getFullName())); } - if (!icebergTable_.getContentFileStore().getEqualityDeleteFiles().isEmpty()) { - throw new AnalysisException( - "MERGE statement is not supported for Iceberg tables " - + "containing equality deletes."); - //TODO: IMPALA-13674 - } for (Column column : icebergTable_.getColumns()) { Path slotPath = new Path(targetTableRef_.desc_, Collections.singletonList(column.getName())); @@ -205,8 +199,8 @@ public class IcebergMergeImpl implements MergeImpl { targetExpressions_ = Expr.substituteList(targetExpressions_, smap, analyzer, true); targetPartitionMetaExpressions_ = Expr.substituteList(targetPartitionMetaExpressions_, smap, analyzer, true); - targetPositionMetaExpressions_ = - Expr.substituteList(targetPositionMetaExpressions_, smap, analyzer, true); + targetDeleteMetaExpressions_ = + Expr.substituteList(targetDeleteMetaExpressions_, smap, analyzer, true); mergeActionExpression_ = mergeActionExpression_.substitute(smap, analyzer, true); targetPartitionExpressions_ = Expr.substituteList(targetPartitionExpressions_, smap, analyzer, true); @@ -218,7 +212,7 @@ public class IcebergMergeImpl implements MergeImpl { @Override public List<Expr> getResultExprs() { List<Expr> result = Lists.newArrayList(targetExpressions_); - result.addAll(targetPositionMetaExpressions_); + result.addAll(targetDeleteMetaExpressions_); result.addAll(targetPartitionMetaExpressions_); result.add(mergeActionExpression_); return result; @@ -243,10 +237,10 @@ public class IcebergMergeImpl implements MergeImpl { // the sink and the merge node differs. List<MergeCase> copyOfCases = mergeStmt_.getCases().stream().map(MergeCase::clone).collect(Collectors.toList()); - List<Expr> positionMetaExprs = Expr.cloneList(targetPositionMetaExpressions_); + List<Expr> deleteMetaExprs = Expr.cloneList(targetDeleteMetaExpressions_); List<Expr> partitionMetaExprs = Expr.cloneList(targetPartitionMetaExpressions_); IcebergMergeNode mergeNode = new IcebergMergeNode(ctx.getNextNodeId(), child, - copyOfCases, rowPresentExpression_.clone(), positionMetaExprs, partitionMetaExprs, + copyOfCases, rowPresentExpression_.clone(), deleteMetaExprs, partitionMetaExprs, mergeActionTuple_, targetTupleId_); mergeNode.init(analyzer); return mergeNode; @@ -279,7 +273,7 @@ public class IcebergMergeImpl implements MergeImpl { deletePartitionKeys = targetPartitionMetaExpressions_; } return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_, - deletePartitionKeys, targetPositionMetaExpressions_, deleteTableId_); + deletePartitionKeys, targetDeleteMetaExpressions_, deleteTableId_); } public TableSink createInsertSink() { @@ -335,27 +329,34 @@ public class IcebergMergeImpl implements MergeImpl { VirtualColumn.ICEBERG_PARTITION_SERIALIZED.getName()))); } - List<Expr> positionMetaExpressions; + List<Expr> deleteMetaExpressions = Lists.newArrayList(); - if (mergeStmt_.hasOnlyInsertCases() && icebergTable_.getContentFileStore() - .getDataFilesWithDeletes().isEmpty()) { - positionMetaExpressions = Collections.emptyList(); - } else { - // DELETE/UPDATE cases require position information to write delete files - positionMetaExpressions = ImmutableList.of( - new SlotRef( - ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.INPUT_FILE_NAME.getName())), - new SlotRef( + boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore() + .getEqualityDeleteFiles().isEmpty(); + boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore() + .getPositionDeleteFiles().isEmpty(); + + if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) { + // DELETE/UPDATE cases require position information to write/read delete files + deleteMetaExpressions.add(new SlotRef( ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.FILE_POSITION.getName()))); + VirtualColumn.INPUT_FILE_NAME.getName()))); + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.FILE_POSITION.getName()))); + } + + if (hasEqualityDeleteFiles) { + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER.getName()))); } selectListItems.add(new SelectListItem(rowPresentExpression, ROW_PRESENT)); selectListItems.addAll( targetSlotRefs.stream().map(expr -> new SelectListItem(expr, null)) .collect(Collectors.toList())); - selectListItems.addAll(positionMetaExpressions.stream() + selectListItems.addAll(deleteMetaExpressions.stream() .map(expr -> new SelectListItem(expr, null)) .collect(Collectors.toList())); selectListItems.addAll(partitionMetaExpressions.stream() @@ -366,7 +367,7 @@ public class IcebergMergeImpl implements MergeImpl { rowPresentExpression_ = rowPresentExpression; targetPartitionMetaExpressions_ = partitionMetaExpressions; - targetPositionMetaExpressions_ = positionMetaExpressions; + targetDeleteMetaExpressions_ = deleteMetaExpressions; targetExpressions_ = targetSlotRefs; FromClause fromClause = diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java index e608d131e..c8df63a20 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java @@ -43,18 +43,18 @@ import org.apache.impala.thrift.TQueryOptions; public class IcebergMergeNode extends PlanNode { private final List<MergeCase> cases_; private final Expr rowPresent_; - private List<Expr> positionMetaExprs_; + private List<Expr> deleteMetaExprs_; private List<Expr> partitionMetaExprs_; private final TupleDescriptor mergeActionTuple_; private final TupleId targetTupleId_; public IcebergMergeNode(PlanNodeId id, PlanNode child, List<MergeCase> cases, - Expr rowPresent, List<Expr> positionMetaExprs, List<Expr> partitionMetaExprs, + Expr rowPresent, List<Expr> deleteMetaExprs, List<Expr> partitionMetaExprs, TupleDescriptor mergeActionTuple, TupleId targetTupleId) { super(id, "MERGE"); this.cases_ = cases; this.rowPresent_ = rowPresent; - this.positionMetaExprs_ = positionMetaExprs; + this.deleteMetaExprs_ = deleteMetaExprs; this.partitionMetaExprs_ = partitionMetaExprs; this.mergeActionTuple_ = mergeActionTuple; this.targetTupleId_ = targetTupleId; @@ -76,7 +76,7 @@ public class IcebergMergeNode extends PlanNode { mergeCases.add(tMergeCase); } TIcebergMergeNode mergeNode = new TIcebergMergeNode(mergeCases, - rowPresent_.treeToThrift(), Expr.treesToThrift(positionMetaExprs_), + rowPresent_.treeToThrift(), Expr.treesToThrift(deleteMetaExprs_), Expr.treesToThrift(partitionMetaExprs_), mergeActionTuple_.getId().asInt(), targetTupleId_.asInt()); msg.setMerge_node(mergeNode); @@ -92,8 +92,8 @@ public class IcebergMergeNode extends PlanNode { } partitionMetaExprs_ = Expr.substituteList(partitionMetaExprs_, getOutputSmap(), analyzer, true); - positionMetaExprs_ = - Expr.substituteList(positionMetaExprs_, getOutputSmap(), analyzer, true); + deleteMetaExprs_ = + Expr.substituteList(deleteMetaExprs_, getOutputSmap(), analyzer, true); rowPresent_.substitute(getOutputSmap(), analyzer, true); } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java index 5f6ac3af1..fff069a32 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java @@ -384,6 +384,10 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { + "id in (select max(id) from functional_parquet.iceberg_non_partitioned)) s " + "on t.id = s.id " + "when matched and s.id > 2 then delete"); + // Target table contains equality delete files + AnalyzesOk("merge into functional_parquet.iceberg_v2_delete_equality t " + + "using functional_parquet.iceberg_v2_delete_equality s " + + "on t.id = s.id when not matched then insert *"); // Inline view as target AnalysisError("merge into " @@ -527,11 +531,5 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { "Target table 'functional_parquet.iceberg_partition_evolution' is incompatible" + " with source expressions.\nExpression 's.`month`' (type: INT) is not " + "compatible with column 'date_string_col' (type: STRING)"); - // Target table contains equality delete files - AnalysisError("merge into functional_parquet.iceberg_v2_delete_equality t " - + "using functional_parquet.iceberg_v2_delete_equality s " - + "on t.id = s.id when not matched then insert *", - "MERGE statement is not supported for Iceberg tables " - + "containing equality deletes."); } } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test new file mode 100644 index 000000000..049829ee5 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test @@ -0,0 +1,23 @@ +==== +---- QUERY +# Merge into partitioned target table with equality delete files from the source table +# using when not matched insert * clause +merge into iceberg_v2_delete_equality_partitioned target using (select cast(i+ 1000000 as int) i, s, d from iceberg_v2_delete_equality_partitioned) source +on target.i = source.i +when not matched then insert * +---- TYPES +INT,STRING,DATE +---- DML_RESULTS: iceberg_v2_delete_equality_partitioned +1,'str1',2023-12-24 +1,'str1',2023-12-25 +2,'str2',2023-12-24 +4,'str4',2023-12-24 +222,'str2',2023-12-25 +333333,'str3',2023-12-24 +1000001,'str1',2023-12-24 +1000001,'str1',2023-12-25 +1000002,'str2',2023-12-24 +1000004,'str4',2023-12-24 +1000222,'str2',2023-12-25 +1333333,'str3',2023-12-24 +==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test new file mode 100644 index 000000000..1fc2f5ea5 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test @@ -0,0 +1,16 @@ +==== +---- QUERY +# Merge into partitioned target table with equality delete files from the source table +# using when matched update clause +merge into iceberg_v2_delete_equality_partitioned target using iceberg_v2_delete_equality_partitioned source +on target.i = source.i and target.i > 10 when matched then update set i = cast(source.i + 100 as int) +---- TYPES +INT,STRING,DATE +---- DML_RESULTS: iceberg_v2_delete_equality_partitioned +1,'str1',2023-12-24 +1,'str1',2023-12-25 +2,'str2',2023-12-24 +4,'str4',2023-12-24 +322,'str2',2023-12-25 +333433,'str3',2023-12-24 +==== diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index 66431dd16..45718c717 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -31,15 +31,18 @@ from tests.util.iceberg_metadata_util import rewrite_metadata def create_iceberg_table_from_directory(impala_client, unique_database, table_name, - file_format): - """Utility function to create an iceberg table from a directory. The directory must - exist in $IMPALA_HOME/testdata/data/iceberg_test with the name 'table_name'""" + file_format, table_location="${IMPALA_HOME}/testdata/data/iceberg_test", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")): + """Utility function to create an iceberg table from a directory.""" + + if not warehouse_prefix and unique_database: + warehouse_prefix = os.getenv("DEFAULT_FS", WAREHOUSE_PREFIX) # Only orc and parquet tested/supported assert file_format == "orc" or file_format == "parquet" - local_dir = os.path.join( - os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name) + table_location = os.path.expandvars(table_location) + local_dir = os.path.join(table_location, table_name) assert os.path.isdir(local_dir) # Rewrite iceberg metadata to use the warehouse prefix and use unique_database @@ -50,7 +53,7 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na check_call(['mkdir', '-p', tmp_dir]) check_call(['cp', '-r', local_dir, tmp_dir]) local_dir = os.path.join(tmp_dir, table_name) - rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata')) + rewrite_metadata(warehouse_prefix, unique_database, os.path.join(local_dir, 'metadata')) # Put the directory in the database's directory (not the table directory) hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database) diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index de7f772ff..36b4909b8 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -2064,6 +2064,20 @@ class TestIcebergV2Table(IcebergTestSuite): def test_merge_star(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database) + def test_merge_equality_update(self, vector, unique_database): + create_iceberg_table_from_directory(self.client, unique_database, + "iceberg_v2_delete_equality_partitioned", "parquet", + table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) + self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database) + + def test_merge_equality_insert(self, vector, unique_database): + create_iceberg_table_from_directory(self.client, unique_database, + "iceberg_v2_delete_equality_partitioned", "parquet", + table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) + self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database) + def test_cleanup(self, unique_database): """Test that all uncommitted files written by Impala are removed from the file system when a DML commit to an Iceberg table fails, and that the effects of the diff --git a/tests/util/iceberg_metadata_util.py b/tests/util/iceberg_metadata_util.py index a97b14d90..0f8e9b830 100644 --- a/tests/util/iceberg_metadata_util.py +++ b/tests/util/iceberg_metadata_util.py @@ -111,6 +111,16 @@ def generate_new_path(table_params, file_path): start_directory, file_path)) result = file_path[start:] + + # Remove unneccessary parts if the table location differs from + # the default location, for example: + # /test-warehouse/iceberg_test/hadoop_catalog/ice/table translates to + # /test-warehouse/table + if unique_database: + table_name_start = file_path.find(table_name) + if table_name_start != start + len(start_directory) + 1: + result = start_directory + result[table_name_start - 1:] + if prefix: result = prefix + result
