This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5c14877b05149ed9ddb6b8734cc77946774d4c0c
Author: Peter Rozsa <[email protected]>
AuthorDate: Tue Apr 8 16:34:06 2025 +0200

    IMPALA-13932: Add file path and position-based duplicate check for 
IcebergMergeNode
    
    IcebergMergeNode's duplicate checking mechanism was based on comparing
    pointers of the target table's rows. This mechanism results in
    false positives if a new row batch reused the memory of the previous row
    batch provided for the merge node. This change adds an additional check
    that validates the file position and file path as well.
    
    Change-Id: I71b47414321675958c05438ef3aeeb5df0128033
    Reviewed-on: http://gerrit.cloudera.org:8080/22761
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 be/src/exec/iceberg-merge-node.cc                  | 43 +++++++++++++++++++---
 be/src/exec/iceberg-merge-node.h                   | 17 +++++++--
 .../apache/impala/analysis/IcebergMergeImpl.java   | 18 ++++-----
 .../PlannerTest/iceberg-merge-insert-only.test     | 24 +++++++-----
 4 files changed, 72 insertions(+), 30 deletions(-)

diff --git a/be/src/exec/iceberg-merge-node.cc 
b/be/src/exec/iceberg-merge-node.cc
index 93a2007d0..1a85fda83 100644
--- a/be/src/exec/iceberg-merge-node.cc
+++ b/be/src/exec/iceberg-merge-node.cc
@@ -17,6 +17,8 @@
 
 #include "exec/iceberg-merge-node.h"
 
+#include <string>
+#include <string_view>
 #include <vector>
 
 #include "common/status.h"
@@ -181,6 +183,11 @@ Status IcebergMergeNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool*
         ReachedLimit() || (child_row_idx_ == child_row_batch_->num_rows() && 
child_eos_);
     if (*eos || child_row_idx_ == child_row_batch_->num_rows()) {
       child_row_idx_ = 0;
+      if (previous_row_ != nullptr) {
+        // Materialize it as a string
+        previous_row_file_path_materialized_ = 
std::string(previous_row_file_path_);
+        previous_row_file_path_ = 
std::string_view(previous_row_file_path_materialized_);
+      }
       child_row_batch_->Reset();
     }
   } while (!*eos && !row_batch->AtCapacity());
@@ -192,11 +199,11 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* 
output_batch) {
     ++child_row_idx_;
     auto row = iter.Get();
 
-    if (IsDuplicateRow(row)) {
+    if (IsDuplicateTargetTuplePtr(row) && IsDuplicateTargetRowIdent(row)) {
       return Status(
           "Duplicate row found: one target table row matched more than one 
source row");
     }
-    previous_row_target_tuple_ = row->GetTuple(target_tuple_idx_);
+    SavePreviousRowPtrAndIdent(row);
 
     auto row_present = row_present_evaluator_->GetTinyIntVal(row).val;
     IcebergMergeCase* selected_case = nullptr;
@@ -272,17 +279,41 @@ void IcebergMergeNode::AddRow(
   IncrementNumRowsReturned(1);
 }
 
-bool IcebergMergeNode::IsDuplicateRow(TupleRow* actual_row) {
-  if (previous_row_target_tuple_ == nullptr) { return false; }
+bool IcebergMergeNode::IsDuplicateTargetTuplePtr(TupleRow* actual_row) {
+  if (previous_row_ == nullptr) { return false; }
+  auto previous_row_target_tuple = previous_row_->GetTuple(target_tuple_idx_);
+  if (previous_row_target_tuple == nullptr) { return false; }
   auto actual_row_target_tuple = actual_row->GetTuple(target_tuple_idx_);
-  return previous_row_target_tuple_ == actual_row_target_tuple;
+  return previous_row_target_tuple == actual_row_target_tuple;
+}
+
+bool IcebergMergeNode::IsDuplicateTargetRowIdent(TupleRow* actual_row) {
+  auto file_path_sv = delete_meta_evaluators_[0]->GetStringVal(actual_row);
+  auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row);
+  if (previous_row_file_pos_ != file_pos.val) { return false; }
+  auto file_path =
+      std::string_view(reinterpret_cast<const char*>(file_path_sv.ptr), 
file_path_sv.len);
+  return file_path == previous_row_file_path_;
+}
+
+void IcebergMergeNode::SavePreviousRowPtrAndIdent(TupleRow* actual_row) {
+  impala_udf::StringVal file_path_sv =
+      delete_meta_evaluators_[0]->GetStringVal(actual_row);
+  auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row);
+  previous_row_file_pos_ = file_pos.val;
+  previous_row_file_path_ =
+      std::string_view(reinterpret_cast<char*>(file_path_sv.ptr), 
file_path_sv.len);
+  previous_row_ = actual_row;
 }
 
 Status IcebergMergeNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   child_row_batch_->TransferResourceOwnership(row_batch);
   child_row_idx_ = 0;
   child_eos_ = false;
-  previous_row_target_tuple_ = nullptr;
+  previous_row_ = nullptr;
+  previous_row_file_path_ = {};
+  previous_row_file_path_materialized_ = {};
+  previous_row_file_pos_ = -1;
   return ExecNode::Reset(state, row_batch);
 }
 
diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h
index bc2af5d14..df625e965 100644
--- a/be/src/exec/iceberg-merge-node.h
+++ b/be/src/exec/iceberg-merge-node.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <string>
+#include <string_view>
 #include <vector>
 
 #include "exec/exec-node.h"
@@ -109,7 +111,9 @@ class IcebergMergeNode : public ExecNode {
   Status EvaluateCases(RowBatch* output_batch);
   void AddRow(RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* 
row);
   bool CheckCase(const IcebergMergeCase * merge_case, TupleRow* row);
-  bool IsDuplicateRow(TupleRow* actual_row);
+  bool IsDuplicateTargetTuplePtr(TupleRow* actual_row);
+  bool IsDuplicateTargetRowIdent(TupleRow* actual_row);
+  void SavePreviousRowPtrAndIdent(TupleRow* actual_row);
 
   std::vector<IcebergMergeCase*> matched_cases_;
   std::vector<IcebergMergeCase*> not_matched_by_target_cases_;
@@ -125,8 +129,15 @@ class IcebergMergeNode : public ExecNode {
   std::vector<ScalarExprEvaluator*> partition_meta_evaluators_;
   ScalarExprEvaluator* row_present_evaluator_;
 
-  /// Pointer to the last processed tuple from target table, used for 
duplicate filtering
-  Tuple* previous_row_target_tuple_ = nullptr;
+  /// Pointer to the last processed tuple row from target table, used for 
duplicate
+  /// filtering
+  TupleRow* previous_row_ = nullptr;
+  /// Previous target row's file position
+  int64_t previous_row_file_pos_ = -1;
+  /// Previous target row's file path
+  std::string_view previous_row_file_path_;
+  /// Previous target row's file path materialized as a string
+  std::string previous_row_file_path_materialized_;
 
   /// Index of the merge action tuple in the row descriptor
   int merge_action_tuple_idx_ = -1;
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
index 536b57245..2a0100a72 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
@@ -333,18 +333,14 @@ public class IcebergMergeImpl implements MergeImpl {
 
     boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore()
         .getEqualityDeleteFiles().isEmpty();
-    boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore()
-        .getPositionDeleteFiles().isEmpty();
 
-    if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) {
-      // DELETE/UPDATE cases require position information to write/read delete 
files
-      deleteMetaExpressions.add(new SlotRef(
-              ImmutableList.of(targetTableRef_.getUniqueAlias(),
-                  VirtualColumn.INPUT_FILE_NAME.getName())));
-      deleteMetaExpressions.add(new SlotRef(
-          ImmutableList.of(targetTableRef_.getUniqueAlias(),
-              VirtualColumn.FILE_POSITION.getName())));
-    }
+    // Required for duplicate checks and for UPDATE and DELETE clauses
+    deleteMetaExpressions.add(new SlotRef(
+            ImmutableList.of(targetTableRef_.getUniqueAlias(),
+                VirtualColumn.INPUT_FILE_NAME.getName())));
+    deleteMetaExpressions.add(new SlotRef(
+        ImmutableList.of(targetTableRef_.getUniqueAlias(),
+            VirtualColumn.FILE_POSITION.getName())));
 
     if (hasEqualityDeleteFiles) {
       deleteMetaExpressions.add(new SlotRef(
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test
index f36bc7952..06f528f52 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test
@@ -11,7 +11,8 @@ functional_parquet.iceberg_partitioned, 
functional_parquet.iceberg_v2_no_deletes
 Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS
 TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) *
 CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s,
-source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
+target.input__file__name, target.file__position, source.* FROM
+functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
 functional_parquet.iceberg_partitioned source ON target.i = source.id
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -51,7 +52,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, 
OVERWRITE=false]
    Iceberg snapshot id: 728158873687794725
    stored statistics:
      table: rows=3 size=625B
-     columns: unavailable
+     columns missing stats: i, s
    extrapolated-rows=disabled max-scan-range-rows=3
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=3
@@ -64,7 +65,8 @@ functional_parquet.iceberg_partitioned, 
functional_parquet.iceberg_v2_no_deletes
 Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS
 TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) *
 CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s,
-source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
+target.input__file__name, target.file__position, source.* FROM
+functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
 functional_parquet.iceberg_partitioned source ON target.i = source.id
 
 F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
@@ -118,7 +120,7 @@ Per-Host Resources: mem-estimate=32.08MB 
mem-reservation=16.00KB thread-reservat
    Iceberg snapshot id: 728158873687794725
    stored statistics:
      table: rows=3 size=625B
-     columns: unavailable
+     columns missing stats: i, s
    extrapolated-rows=disabled max-scan-range-rows=3
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=3
@@ -139,7 +141,8 @@ functional_parquet.iceberg_partitioned, 
functional_parquet.iceberg_v2_no_deletes
 Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS
 TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) *
 CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s,
-source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
+target.input__file__name, target.file__position, source.* FROM
+functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
 functional_parquet.iceberg_partitioned source ON target.i = source.id
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -187,7 +190,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, 
OVERWRITE=false]
    Iceberg snapshot id: 728158873687794725
    stored statistics:
      table: rows=3 size=625B
-     columns: unavailable
+     columns missing stats: i, s
    extrapolated-rows=disabled max-scan-range-rows=3
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=3
@@ -200,7 +203,8 @@ functional_parquet.iceberg_partitioned, 
functional_parquet.iceberg_v2_no_deletes
 Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS
 TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) *
 CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s,
-source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
+target.input__file__name, target.file__position, source.* FROM
+functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN
 functional_parquet.iceberg_partitioned source ON target.i = source.id
 
 F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
@@ -262,7 +266,7 @@ Per-Host Resources: mem-estimate=32.08MB 
mem-reservation=16.00KB thread-reservat
    Iceberg snapshot id: 728158873687794725
    stored statistics:
      table: rows=3 size=625B
-     columns: unavailable
+     columns missing stats: i, s
    extrapolated-rows=disabled max-scan-range-rows=3
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=3
@@ -282,8 +286,8 @@ functional_parquet.iceberg_partition_transforms_zorder
 Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS
 TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) *
 CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.ts, target.s,
-target.i, target.j, target.partition__spec__id,
-target.iceberg__partition__serialized, source.* FROM
+target.i, target.j, target.input__file__name, target.file__position,
+target.partition__spec__id, target.iceberg__partition__serialized, source.* 
FROM
 functional_parquet.iceberg_partition_transforms_zorder target FULL OUTER JOIN
 (SELECT ts, s, CAST(CAST(i AS BIGINT) + CAST(10 AS BIGINT) AS INT) i, j FROM
 functional_parquet.iceberg_partition_transforms_zorder) source ON target.i =

Reply via email to