This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 167ced78448d73cdb4b742a3ba43b0752302a167
Author: Peter Rozsa <[email protected]>
AuthorDate: Mon Jan 20 11:05:08 2025 +0100

    IMPALA-13674: Enable MERGE statement for Iceberg tables with equality 
deletes
    
    This change fixes the delete expression calculation for
    IcebergMergeImpl, when an Iceberg table contains equality deletes, the
    merge implementation now includes the data sequence number in the result
    expressions as the underlying tuple descriptor also includes it
    implicitly. Without including this field, the row evaluation fails
    because of the mismatching number of evaluators and slot descriptors.
    
    Tests:
     - manually validated on an Iceberg table that contains equality delete
     - e2e test added
    
    Change-Id: I60e48e2731a59520373dbb75104d75aae39a94c1
    Reviewed-on: http://gerrit.cloudera.org:8080/22423
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/iceberg-merge-node.cc                  | 22 ++++-----
 be/src/exec/iceberg-merge-node.h                   | 10 ++--
 common/thrift/PlanNodes.thrift                     |  2 +-
 .../apache/impala/analysis/IcebergMergeImpl.java   | 57 +++++++++++-----------
 .../apache/impala/planner/IcebergMergeNode.java    | 12 ++---
 .../impala/analysis/AnalyzeModifyStmtsTest.java    | 10 ++--
 .../QueryTest/iceberg-merge-equality-insert.test   | 23 +++++++++
 .../QueryTest/iceberg-merge-equality-update.test   | 16 ++++++
 tests/common/file_utils.py                         | 15 +++---
 tests/query_test/test_iceberg.py                   | 14 ++++++
 tests/util/iceberg_metadata_util.py                | 10 ++++
 11 files changed, 128 insertions(+), 63 deletions(-)

diff --git a/be/src/exec/iceberg-merge-node.cc 
b/be/src/exec/iceberg-merge-node.cc
index bdb1b45a4..93a2007d0 100644
--- a/be/src/exec/iceberg-merge-node.cc
+++ b/be/src/exec/iceberg-merge-node.cc
@@ -50,8 +50,8 @@ Status IcebergMergePlanNode::Init(const TPlanNode& tnode, 
FragmentState* state)
   RETURN_IF_ERROR(ScalarExpr::Create(
       tnode.merge_node.row_present, *row_descriptor_, state, pool, 
&row_present_));
 
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.position_meta_exprs,
-      *row_descriptor_, state, pool, &position_meta_exprs_));
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.delete_meta_exprs,
+      *row_descriptor_, state, pool, &delete_meta_exprs_));
 
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.partition_meta_exprs,
       *row_descriptor_, state, pool, &partition_meta_exprs_));
@@ -99,7 +99,7 @@ IcebergMergeNode::IcebergMergeNode(
     child_row_idx_(0),
     child_eos_(false),
     row_present_(pnode.row_present_),
-    position_meta_exprs_(pnode.position_meta_exprs_),
+    delete_meta_exprs_(pnode.delete_meta_exprs_),
     partition_meta_exprs_(pnode.partition_meta_exprs_) {
   DCHECK(pnode.merge_action_tuple_id_ != -1);
   DCHECK(pnode.target_tuple_id_ != -1);
@@ -132,8 +132,8 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) {
       expr_perm_pool_.get(), expr_results_pool_.get(), 
&row_present_evaluator_));
 
   RETURN_IF_ERROR(
-      ScalarExprEvaluator::Create(position_meta_exprs_, state, 
state->obj_pool(),
-          expr_perm_pool_.get(), expr_results_pool_.get(), 
&position_meta_evaluators_));
+      ScalarExprEvaluator::Create(delete_meta_exprs_, state, state->obj_pool(),
+          expr_perm_pool_.get(), expr_results_pool_.get(), 
&delete_meta_evaluators_));
 
   RETURN_IF_ERROR(
       ScalarExprEvaluator::Create(partition_meta_exprs_, state, 
state->obj_pool(),
@@ -155,7 +155,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) {
       new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
 
   RETURN_IF_ERROR(row_present_evaluator_->Open(state));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(delete_meta_evaluators_, state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, 
state));
 
   for (auto* merge_case : all_cases_) {
@@ -293,18 +293,18 @@ void IcebergMergeNode::Close(RuntimeState* state) {
     merge_case->Close(state);
   }
   row_present_evaluator_->Close(state);
-  ScalarExprEvaluator::Close(position_meta_evaluators_, state);
+  ScalarExprEvaluator::Close(delete_meta_evaluators_, state);
   ScalarExprEvaluator::Close(partition_meta_evaluators_, state);
 
   row_present_->Close();
-  ScalarExpr::Close(position_meta_exprs_);
+  ScalarExpr::Close(delete_meta_exprs_);
   ScalarExpr::Close(partition_meta_exprs_);
 
   ExecNode::Close(state);
 }
 
-const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PositionMetaEvals() 
{
-  return position_meta_evaluators_;
+const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::DeleteMetaEvals() {
+  return delete_meta_evaluators_;
 }
 
 const std::vector<ScalarExprEvaluator*>& 
IcebergMergeNode::PartitionMetaEvals() {
@@ -328,7 +328,7 @@ Status IcebergMergeCase::Prepare(RuntimeState* state, 
IcebergMergeNode& parent)
   combined_evaluators_.insert(
       combined_evaluators_.end(), output_evaluators_.begin(), 
output_evaluators_.end());
   combined_evaluators_.insert(combined_evaluators_.end(),
-      parent.PositionMetaEvals().begin(), parent.PositionMetaEvals().end());
+      parent.DeleteMetaEvals().begin(), parent.DeleteMetaEvals().end());
   combined_evaluators_.insert(combined_evaluators_.end(),
       parent.PartitionMetaEvals().begin(), parent.PartitionMetaEvals().end());
   return Status::OK();
diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h
index be08ac698..bc2af5d14 100644
--- a/be/src/exec/iceberg-merge-node.h
+++ b/be/src/exec/iceberg-merge-node.h
@@ -61,8 +61,8 @@ class IcebergMergePlanNode : public PlanNode {
   /// target tuple, the source tuple, or both.
   ScalarExpr* row_present_ = nullptr;
 
-  /// Exprs used to identify the position of each target record
-  std::vector<ScalarExpr*> position_meta_exprs_;
+  /// Exprs used to identify the position/delete information of each target 
record
+  std::vector<ScalarExpr*> delete_meta_exprs_;
 
   /// Exprs used to identify the partitioning properties of a record
   std::vector<ScalarExpr*> partition_meta_exprs_;
@@ -91,7 +91,7 @@ class IcebergMergeNode : public ExecNode {
   Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   void Close(RuntimeState* state) override;
-  const std::vector<ScalarExprEvaluator*>& PositionMetaEvals();
+  const std::vector<ScalarExprEvaluator*>& DeleteMetaEvals();
   const std::vector<ScalarExprEvaluator*>& PartitionMetaEvals();
 
  private:
@@ -119,9 +119,9 @@ class IcebergMergeNode : public ExecNode {
   int child_row_idx_;
   bool child_eos_;
   ScalarExpr* row_present_;
-  std::vector<ScalarExpr*> position_meta_exprs_;
+  std::vector<ScalarExpr*> delete_meta_exprs_;
   std::vector<ScalarExpr*> partition_meta_exprs_;
-  std::vector<ScalarExprEvaluator*> position_meta_evaluators_;
+  std::vector<ScalarExprEvaluator*> delete_meta_evaluators_;
   std::vector<ScalarExprEvaluator*> partition_meta_evaluators_;
   ScalarExprEvaluator* row_present_evaluator_;
 
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 47872faf2..3760a6b9f 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -758,7 +758,7 @@ struct TIcebergMergeCase {
 struct TIcebergMergeNode {
   1: required list<TIcebergMergeCase> cases
   2: required Exprs.TExpr row_present
-  3: required list<Exprs.TExpr> position_meta_exprs
+  3: required list<Exprs.TExpr> delete_meta_exprs
   4: required list<Exprs.TExpr> partition_meta_exprs
   5: required Types.TTupleId merge_action_tuple_id
   6: required Types.TTupleId target_tuple_id
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
index 6c6c00c45..536b57245 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
@@ -80,7 +80,7 @@ public class IcebergMergeImpl implements MergeImpl {
   private TupleId targetTupleId_;
 
   private List<Expr> targetExpressions_;
-  private List<Expr> targetPositionMetaExpressions_;
+  private List<Expr> targetDeleteMetaExpressions_;
   private List<Expr> targetPartitionMetaExpressions_;
   private List<Expr> targetPartitionExpressions_;
   private MergeSorting targetSorting_;
@@ -94,7 +94,7 @@ public class IcebergMergeImpl implements MergeImpl {
     on_ = on;
     table_ = targetTableRef_.getTable();
     targetExpressions_ = Lists.newArrayList();
-    targetPositionMetaExpressions_ = Lists.newArrayList();
+    targetDeleteMetaExpressions_ = Lists.newArrayList();
     targetPartitionMetaExpressions_ = Lists.newArrayList();
     targetPartitionExpressions_ = Lists.newArrayList();
   }
@@ -127,12 +127,6 @@ public class IcebergMergeImpl implements MergeImpl {
           "Unsupported '%s': '%s' for Iceberg table: %s",
           TableProperties.MERGE_MODE, modifyWriteMode, 
icebergTable_.getFullName()));
     }
-    if 
(!icebergTable_.getContentFileStore().getEqualityDeleteFiles().isEmpty()) {
-      throw new AnalysisException(
-          "MERGE statement is not supported for Iceberg tables "
-              + "containing equality deletes.");
-      //TODO: IMPALA-13674
-    }
     for (Column column : icebergTable_.getColumns()) {
       Path slotPath =
           new Path(targetTableRef_.desc_, 
Collections.singletonList(column.getName()));
@@ -205,8 +199,8 @@ public class IcebergMergeImpl implements MergeImpl {
     targetExpressions_ = Expr.substituteList(targetExpressions_, smap, 
analyzer, true);
     targetPartitionMetaExpressions_ =
         Expr.substituteList(targetPartitionMetaExpressions_, smap, analyzer, 
true);
-    targetPositionMetaExpressions_ =
-        Expr.substituteList(targetPositionMetaExpressions_, smap, analyzer, 
true);
+    targetDeleteMetaExpressions_ =
+        Expr.substituteList(targetDeleteMetaExpressions_, smap, analyzer, 
true);
     mergeActionExpression_ = mergeActionExpression_.substitute(smap, analyzer, 
true);
     targetPartitionExpressions_ =
         Expr.substituteList(targetPartitionExpressions_, smap, analyzer, true);
@@ -218,7 +212,7 @@ public class IcebergMergeImpl implements MergeImpl {
   @Override
   public List<Expr> getResultExprs() {
     List<Expr> result = Lists.newArrayList(targetExpressions_);
-    result.addAll(targetPositionMetaExpressions_);
+    result.addAll(targetDeleteMetaExpressions_);
     result.addAll(targetPartitionMetaExpressions_);
     result.add(mergeActionExpression_);
     return result;
@@ -243,10 +237,10 @@ public class IcebergMergeImpl implements MergeImpl {
     // the sink and the merge node differs.
     List<MergeCase> copyOfCases =
         
mergeStmt_.getCases().stream().map(MergeCase::clone).collect(Collectors.toList());
-    List<Expr> positionMetaExprs = 
Expr.cloneList(targetPositionMetaExpressions_);
+    List<Expr> deleteMetaExprs = Expr.cloneList(targetDeleteMetaExpressions_);
     List<Expr> partitionMetaExprs = 
Expr.cloneList(targetPartitionMetaExpressions_);
     IcebergMergeNode mergeNode = new IcebergMergeNode(ctx.getNextNodeId(), 
child,
-        copyOfCases, rowPresentExpression_.clone(), positionMetaExprs, 
partitionMetaExprs,
+        copyOfCases, rowPresentExpression_.clone(), deleteMetaExprs, 
partitionMetaExprs,
         mergeActionTuple_, targetTupleId_);
     mergeNode.init(analyzer);
     return mergeNode;
@@ -279,7 +273,7 @@ public class IcebergMergeImpl implements MergeImpl {
       deletePartitionKeys = targetPartitionMetaExpressions_;
     }
     return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_,
-        deletePartitionKeys, targetPositionMetaExpressions_, deleteTableId_);
+        deletePartitionKeys, targetDeleteMetaExpressions_, deleteTableId_);
   }
 
   public TableSink createInsertSink() {
@@ -335,27 +329,34 @@ public class IcebergMergeImpl implements MergeImpl {
               VirtualColumn.ICEBERG_PARTITION_SERIALIZED.getName())));
     }
 
-    List<Expr> positionMetaExpressions;
+    List<Expr> deleteMetaExpressions = Lists.newArrayList();
 
-    if (mergeStmt_.hasOnlyInsertCases() && icebergTable_.getContentFileStore()
-        .getDataFilesWithDeletes().isEmpty()) {
-      positionMetaExpressions = Collections.emptyList();
-    } else {
-      // DELETE/UPDATE cases require position information to write delete files
-      positionMetaExpressions = ImmutableList.of(
-          new SlotRef(
-              ImmutableList.of(targetTableRef_.getUniqueAlias(),
-                  VirtualColumn.INPUT_FILE_NAME.getName())),
-          new SlotRef(
+    boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore()
+        .getEqualityDeleteFiles().isEmpty();
+    boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore()
+        .getPositionDeleteFiles().isEmpty();
+
+    if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) {
+      // DELETE/UPDATE cases require position information to write/read delete 
files
+      deleteMetaExpressions.add(new SlotRef(
               ImmutableList.of(targetTableRef_.getUniqueAlias(),
-                  VirtualColumn.FILE_POSITION.getName())));
+                  VirtualColumn.INPUT_FILE_NAME.getName())));
+      deleteMetaExpressions.add(new SlotRef(
+          ImmutableList.of(targetTableRef_.getUniqueAlias(),
+              VirtualColumn.FILE_POSITION.getName())));
+    }
+
+    if (hasEqualityDeleteFiles) {
+      deleteMetaExpressions.add(new SlotRef(
+          ImmutableList.of(targetTableRef_.getUniqueAlias(),
+              VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER.getName())));
     }
 
     selectListItems.add(new SelectListItem(rowPresentExpression, ROW_PRESENT));
     selectListItems.addAll(
         targetSlotRefs.stream().map(expr -> new SelectListItem(expr, null))
             .collect(Collectors.toList()));
-    selectListItems.addAll(positionMetaExpressions.stream()
+    selectListItems.addAll(deleteMetaExpressions.stream()
         .map(expr -> new SelectListItem(expr, null))
         .collect(Collectors.toList()));
     selectListItems.addAll(partitionMetaExpressions.stream()
@@ -366,7 +367,7 @@ public class IcebergMergeImpl implements MergeImpl {
 
     rowPresentExpression_ = rowPresentExpression;
     targetPartitionMetaExpressions_ = partitionMetaExpressions;
-    targetPositionMetaExpressions_ = positionMetaExpressions;
+    targetDeleteMetaExpressions_ = deleteMetaExpressions;
     targetExpressions_ = targetSlotRefs;
 
     FromClause fromClause =
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
index e608d131e..c8df63a20 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
@@ -43,18 +43,18 @@ import org.apache.impala.thrift.TQueryOptions;
 public class IcebergMergeNode extends PlanNode {
   private final List<MergeCase> cases_;
   private final Expr rowPresent_;
-  private List<Expr> positionMetaExprs_;
+  private List<Expr> deleteMetaExprs_;
   private List<Expr> partitionMetaExprs_;
   private final TupleDescriptor mergeActionTuple_;
   private final TupleId targetTupleId_;
 
   public IcebergMergeNode(PlanNodeId id, PlanNode child, List<MergeCase> cases,
-      Expr rowPresent, List<Expr> positionMetaExprs, List<Expr> 
partitionMetaExprs,
+      Expr rowPresent, List<Expr> deleteMetaExprs, List<Expr> 
partitionMetaExprs,
       TupleDescriptor mergeActionTuple, TupleId targetTupleId) {
     super(id, "MERGE");
     this.cases_ = cases;
     this.rowPresent_ = rowPresent;
-    this.positionMetaExprs_ = positionMetaExprs;
+    this.deleteMetaExprs_ = deleteMetaExprs;
     this.partitionMetaExprs_ = partitionMetaExprs;
     this.mergeActionTuple_ = mergeActionTuple;
     this.targetTupleId_ = targetTupleId;
@@ -76,7 +76,7 @@ public class IcebergMergeNode extends PlanNode {
       mergeCases.add(tMergeCase);
     }
     TIcebergMergeNode mergeNode = new TIcebergMergeNode(mergeCases,
-        rowPresent_.treeToThrift(), Expr.treesToThrift(positionMetaExprs_),
+        rowPresent_.treeToThrift(), Expr.treesToThrift(deleteMetaExprs_),
         Expr.treesToThrift(partitionMetaExprs_), 
mergeActionTuple_.getId().asInt(),
         targetTupleId_.asInt());
     msg.setMerge_node(mergeNode);
@@ -92,8 +92,8 @@ public class IcebergMergeNode extends PlanNode {
     }
     partitionMetaExprs_ =
         Expr.substituteList(partitionMetaExprs_, getOutputSmap(), analyzer, 
true);
-    positionMetaExprs_ =
-        Expr.substituteList(positionMetaExprs_, getOutputSmap(), analyzer, 
true);
+    deleteMetaExprs_ =
+        Expr.substituteList(deleteMetaExprs_, getOutputSmap(), analyzer, true);
     rowPresent_.substitute(getOutputSmap(), analyzer, true);
   }
 
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
index 5f6ac3af1..fff069a32 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
@@ -384,6 +384,10 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
         + "id in (select max(id) from 
functional_parquet.iceberg_non_partitioned)) s "
         + "on t.id = s.id "
         + "when matched and s.id > 2 then delete");
+    // Target table contains equality delete files
+    AnalyzesOk("merge into functional_parquet.iceberg_v2_delete_equality t "
+        + "using functional_parquet.iceberg_v2_delete_equality s "
+        + "on t.id = s.id when not matched then insert *");
 
     // Inline view as target
     AnalysisError("merge into "
@@ -527,11 +531,5 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
         "Target table 'functional_parquet.iceberg_partition_evolution' is 
incompatible"
             + " with source expressions.\nExpression 's.`month`' (type: INT) 
is not "
             + "compatible with column 'date_string_col' (type: STRING)");
-    // Target table contains equality delete files
-    AnalysisError("merge into functional_parquet.iceberg_v2_delete_equality t "
-            + "using functional_parquet.iceberg_v2_delete_equality s "
-            + "on t.id = s.id when not matched then insert *",
-        "MERGE statement is not supported for Iceberg tables "
-            + "containing equality deletes.");
   }
 }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test
new file mode 100644
index 000000000..049829ee5
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test
@@ -0,0 +1,23 @@
+====
+---- QUERY
+# Merge into partitioned target table with equality delete files from the 
source table
+# using when not matched insert * clause
+merge into iceberg_v2_delete_equality_partitioned target using (select cast(i+ 
1000000 as int) i, s, d from iceberg_v2_delete_equality_partitioned) source
+on target.i = source.i
+when not matched then insert *
+---- TYPES
+INT,STRING,DATE
+---- DML_RESULTS: iceberg_v2_delete_equality_partitioned
+1,'str1',2023-12-24
+1,'str1',2023-12-25
+2,'str2',2023-12-24
+4,'str4',2023-12-24
+222,'str2',2023-12-25
+333333,'str3',2023-12-24
+1000001,'str1',2023-12-24
+1000001,'str1',2023-12-25
+1000002,'str2',2023-12-24
+1000004,'str4',2023-12-24
+1000222,'str2',2023-12-25
+1333333,'str3',2023-12-24
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test
new file mode 100644
index 000000000..1fc2f5ea5
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# Merge into partitioned target table with equality delete files from the 
source table
+# using when matched update clause
+merge into iceberg_v2_delete_equality_partitioned target using 
iceberg_v2_delete_equality_partitioned source
+on target.i = source.i and target.i > 10 when matched then update set i = 
cast(source.i + 100 as int)
+---- TYPES
+INT,STRING,DATE
+---- DML_RESULTS: iceberg_v2_delete_equality_partitioned
+1,'str1',2023-12-24
+1,'str1',2023-12-25
+2,'str2',2023-12-24
+4,'str4',2023-12-24
+322,'str2',2023-12-25
+333433,'str3',2023-12-24
+====
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index 66431dd16..45718c717 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -31,15 +31,18 @@ from tests.util.iceberg_metadata_util import 
rewrite_metadata
 
 
 def create_iceberg_table_from_directory(impala_client, unique_database, 
table_name,
-                                        file_format):
-  """Utility function to create an iceberg table from a directory. The 
directory must
-  exist in $IMPALA_HOME/testdata/data/iceberg_test with the name 
'table_name'"""
+      file_format, table_location="${IMPALA_HOME}/testdata/data/iceberg_test",
+      warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")):
+  """Utility function to create an iceberg table from a directory."""
+
+  if not warehouse_prefix and unique_database:
+    warehouse_prefix = os.getenv("DEFAULT_FS", WAREHOUSE_PREFIX)
 
   # Only orc and parquet tested/supported
   assert file_format == "orc" or file_format == "parquet"
 
-  local_dir = os.path.join(
-    os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name)
+  table_location = os.path.expandvars(table_location)
+  local_dir = os.path.join(table_location, table_name)
   assert os.path.isdir(local_dir)
 
   # Rewrite iceberg metadata to use the warehouse prefix and use 
unique_database
@@ -50,7 +53,7 @@ def create_iceberg_table_from_directory(impala_client, 
unique_database, table_na
   check_call(['mkdir', '-p', tmp_dir])
   check_call(['cp', '-r', local_dir, tmp_dir])
   local_dir = os.path.join(tmp_dir, table_name)
-  rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 
'metadata'))
+  rewrite_metadata(warehouse_prefix, unique_database, os.path.join(local_dir, 
'metadata'))
 
   # Put the directory in the database's directory (not the table directory)
   hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), 
unique_database)
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index de7f772ff..36b4909b8 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2064,6 +2064,20 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_merge_star(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
 
+  def test_merge_equality_update(self, vector, unique_database):
+    create_iceberg_table_from_directory(self.client, unique_database,
+        "iceberg_v2_delete_equality_partitioned", "parquet",
+        
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
+        warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
+    self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, 
unique_database)
+
+  def test_merge_equality_insert(self, vector, unique_database):
+    create_iceberg_table_from_directory(self.client, unique_database,
+        "iceberg_v2_delete_equality_partitioned", "parquet",
+        
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
+        warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
+    self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, 
unique_database)
+
   def test_cleanup(self, unique_database):
       """Test that all uncommitted files written by Impala are removed from 
the file
       system when a DML commit to an Iceberg table fails, and that the effects 
of the
diff --git a/tests/util/iceberg_metadata_util.py 
b/tests/util/iceberg_metadata_util.py
index a97b14d90..0f8e9b830 100644
--- a/tests/util/iceberg_metadata_util.py
+++ b/tests/util/iceberg_metadata_util.py
@@ -111,6 +111,16 @@ def generate_new_path(table_params, file_path):
       start_directory, file_path))
 
   result = file_path[start:]
+
+  # Remove unneccessary parts if the table location differs from
+  # the default location, for example:
+  # /test-warehouse/iceberg_test/hadoop_catalog/ice/table translates to
+  # /test-warehouse/table
+  if unique_database:
+    table_name_start = file_path.find(table_name)
+    if table_name_start != start + len(start_directory) + 1:
+      result = start_directory + result[table_name_start - 1:]
+
   if prefix:
     result = prefix + result
 

Reply via email to