This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5c14877b05149ed9ddb6b8734cc77946774d4c0c Author: Peter Rozsa <[email protected]> AuthorDate: Tue Apr 8 16:34:06 2025 +0200 IMPALA-13932: Add file path and position-based duplicate check for IcebergMergeNode IcebergMergeNode's duplicate checking mechanism was based on comparing pointers of the target table's rows. This mechanism results in false positives if a new row batch reused the memory of the previous row batch provided for the merge node. This change adds an additional check that validates the file position and file path as well. Change-Id: I71b47414321675958c05438ef3aeeb5df0128033 Reviewed-on: http://gerrit.cloudera.org:8080/22761 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Zoltan Borok-Nagy <[email protected]> --- be/src/exec/iceberg-merge-node.cc | 43 +++++++++++++++++++--- be/src/exec/iceberg-merge-node.h | 17 +++++++-- .../apache/impala/analysis/IcebergMergeImpl.java | 18 ++++----- .../PlannerTest/iceberg-merge-insert-only.test | 24 +++++++----- 4 files changed, 72 insertions(+), 30 deletions(-) diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index 93a2007d0..1a85fda83 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -17,6 +17,8 @@ #include "exec/iceberg-merge-node.h" +#include <string> +#include <string_view> #include <vector> #include "common/status.h" @@ -181,6 +183,11 @@ Status IcebergMergeNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* ReachedLimit() || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_); if (*eos || child_row_idx_ == child_row_batch_->num_rows()) { child_row_idx_ = 0; + if (previous_row_ != nullptr) { + // Materialize it as a string + previous_row_file_path_materialized_ = std::string(previous_row_file_path_); + previous_row_file_path_ = std::string_view(previous_row_file_path_materialized_); + } child_row_batch_->Reset(); } } while (!*eos && !row_batch->AtCapacity()); @@ -192,11 +199,11 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* output_batch) { ++child_row_idx_; auto row = iter.Get(); - if (IsDuplicateRow(row)) { + if (IsDuplicateTargetTuplePtr(row) && IsDuplicateTargetRowIdent(row)) { return Status( "Duplicate row found: one target table row matched more than one source row"); } - previous_row_target_tuple_ = row->GetTuple(target_tuple_idx_); + SavePreviousRowPtrAndIdent(row); auto row_present = row_present_evaluator_->GetTinyIntVal(row).val; IcebergMergeCase* selected_case = nullptr; @@ -272,17 +279,41 @@ void IcebergMergeNode::AddRow( IncrementNumRowsReturned(1); } -bool IcebergMergeNode::IsDuplicateRow(TupleRow* actual_row) { - if (previous_row_target_tuple_ == nullptr) { return false; } +bool IcebergMergeNode::IsDuplicateTargetTuplePtr(TupleRow* actual_row) { + if (previous_row_ == nullptr) { return false; } + auto previous_row_target_tuple = previous_row_->GetTuple(target_tuple_idx_); + if (previous_row_target_tuple == nullptr) { return false; } auto actual_row_target_tuple = actual_row->GetTuple(target_tuple_idx_); - return previous_row_target_tuple_ == actual_row_target_tuple; + return previous_row_target_tuple == actual_row_target_tuple; +} + +bool IcebergMergeNode::IsDuplicateTargetRowIdent(TupleRow* actual_row) { + auto file_path_sv = delete_meta_evaluators_[0]->GetStringVal(actual_row); + auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row); + if (previous_row_file_pos_ != file_pos.val) { return false; } + auto file_path = + std::string_view(reinterpret_cast<const char*>(file_path_sv.ptr), file_path_sv.len); + return file_path == previous_row_file_path_; +} + +void IcebergMergeNode::SavePreviousRowPtrAndIdent(TupleRow* actual_row) { + impala_udf::StringVal file_path_sv = + delete_meta_evaluators_[0]->GetStringVal(actual_row); + auto file_pos = delete_meta_evaluators_[1]->GetBigIntVal(actual_row); + previous_row_file_pos_ = file_pos.val; + previous_row_file_path_ = + std::string_view(reinterpret_cast<char*>(file_path_sv.ptr), file_path_sv.len); + previous_row_ = actual_row; } Status IcebergMergeNode::Reset(RuntimeState* state, RowBatch* row_batch) { child_row_batch_->TransferResourceOwnership(row_batch); child_row_idx_ = 0; child_eos_ = false; - previous_row_target_tuple_ = nullptr; + previous_row_ = nullptr; + previous_row_file_path_ = {}; + previous_row_file_path_materialized_ = {}; + previous_row_file_pos_ = -1; return ExecNode::Reset(state, row_batch); } diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h index bc2af5d14..df625e965 100644 --- a/be/src/exec/iceberg-merge-node.h +++ b/be/src/exec/iceberg-merge-node.h @@ -17,6 +17,8 @@ #pragma once +#include <string> +#include <string_view> #include <vector> #include "exec/exec-node.h" @@ -109,7 +111,9 @@ class IcebergMergeNode : public ExecNode { Status EvaluateCases(RowBatch* output_batch); void AddRow(RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row); bool CheckCase(const IcebergMergeCase * merge_case, TupleRow* row); - bool IsDuplicateRow(TupleRow* actual_row); + bool IsDuplicateTargetTuplePtr(TupleRow* actual_row); + bool IsDuplicateTargetRowIdent(TupleRow* actual_row); + void SavePreviousRowPtrAndIdent(TupleRow* actual_row); std::vector<IcebergMergeCase*> matched_cases_; std::vector<IcebergMergeCase*> not_matched_by_target_cases_; @@ -125,8 +129,15 @@ class IcebergMergeNode : public ExecNode { std::vector<ScalarExprEvaluator*> partition_meta_evaluators_; ScalarExprEvaluator* row_present_evaluator_; - /// Pointer to the last processed tuple from target table, used for duplicate filtering - Tuple* previous_row_target_tuple_ = nullptr; + /// Pointer to the last processed tuple row from target table, used for duplicate + /// filtering + TupleRow* previous_row_ = nullptr; + /// Previous target row's file position + int64_t previous_row_file_pos_ = -1; + /// Previous target row's file path + std::string_view previous_row_file_path_; + /// Previous target row's file path materialized as a string + std::string previous_row_file_path_materialized_; /// Index of the merge action tuple in the row descriptor int merge_action_tuple_idx_ = -1; diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java index 536b57245..2a0100a72 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java @@ -333,18 +333,14 @@ public class IcebergMergeImpl implements MergeImpl { boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore() .getEqualityDeleteFiles().isEmpty(); - boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore() - .getPositionDeleteFiles().isEmpty(); - if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) { - // DELETE/UPDATE cases require position information to write/read delete files - deleteMetaExpressions.add(new SlotRef( - ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.INPUT_FILE_NAME.getName()))); - deleteMetaExpressions.add(new SlotRef( - ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.FILE_POSITION.getName()))); - } + // Required for duplicate checks and for UPDATE and DELETE clauses + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.INPUT_FILE_NAME.getName()))); + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.FILE_POSITION.getName()))); if (hasEqualityDeleteFiles) { deleteMetaExpressions.add(new SlotRef( diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test index f36bc7952..06f528f52 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge-insert-only.test @@ -11,7 +11,8 @@ functional_parquet.iceberg_partitioned, functional_parquet.iceberg_v2_no_deletes Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) * CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s, -source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN +target.input__file__name, target.file__position, source.* FROM +functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN functional_parquet.iceberg_partitioned source ON target.i = source.id F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -51,7 +52,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false] Iceberg snapshot id: 728158873687794725 stored statistics: table: rows=3 size=625B - columns: unavailable + columns missing stats: i, s extrapolated-rows=disabled max-scan-range-rows=3 mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 tuple-ids=0 row-size=16B cardinality=3 @@ -64,7 +65,8 @@ functional_parquet.iceberg_partitioned, functional_parquet.iceberg_v2_no_deletes Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) * CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s, -source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN +target.input__file__name, target.file__position, source.* FROM +functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN functional_parquet.iceberg_partitioned source ON target.i = source.id F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 @@ -118,7 +120,7 @@ Per-Host Resources: mem-estimate=32.08MB mem-reservation=16.00KB thread-reservat Iceberg snapshot id: 728158873687794725 stored statistics: table: rows=3 size=625B - columns: unavailable + columns missing stats: i, s extrapolated-rows=disabled max-scan-range-rows=3 mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 tuple-ids=0 row-size=16B cardinality=3 @@ -139,7 +141,8 @@ functional_parquet.iceberg_partitioned, functional_parquet.iceberg_v2_no_deletes Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) * CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s, -source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN +target.input__file__name, target.file__position, source.* FROM +functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN functional_parquet.iceberg_partitioned source ON target.i = source.id F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -187,7 +190,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false] Iceberg snapshot id: 728158873687794725 stored statistics: table: rows=3 size=625B - columns: unavailable + columns missing stats: i, s extrapolated-rows=disabled max-scan-range-rows=3 mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 tuple-ids=0 row-size=16B cardinality=3 @@ -200,7 +203,8 @@ functional_parquet.iceberg_partitioned, functional_parquet.iceberg_v2_no_deletes Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) * CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.i, target.s, -source.* FROM functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN +target.input__file__name, target.file__position, source.* FROM +functional_parquet.iceberg_v2_no_deletes target FULL OUTER JOIN functional_parquet.iceberg_partitioned source ON target.i = source.id F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 @@ -262,7 +266,7 @@ Per-Host Resources: mem-estimate=32.08MB mem-reservation=16.00KB thread-reservat Iceberg snapshot id: 728158873687794725 stored statistics: table: rows=3 size=625B - columns: unavailable + columns missing stats: i, s extrapolated-rows=disabled max-scan-range-rows=3 mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 tuple-ids=0 row-size=16B cardinality=3 @@ -282,8 +286,8 @@ functional_parquet.iceberg_partition_transforms_zorder Analyzed query: SELECT /* +straight_join */ CAST(CAST(CAST(TupleIsNull(0) AS TINYINT) AS INT) + CAST(CAST(CAST(TupleIsNull(1) AS TINYINT) AS SMALLINT) * CAST(2 AS SMALLINT) AS INT) AS TINYINT) row_present, target.ts, target.s, -target.i, target.j, target.partition__spec__id, -target.iceberg__partition__serialized, source.* FROM +target.i, target.j, target.input__file__name, target.file__position, +target.partition__spec__id, target.iceberg__partition__serialized, source.* FROM functional_parquet.iceberg_partition_transforms_zorder target FULL OUTER JOIN (SELECT ts, s, CAST(CAST(i AS BIGINT) + CAST(10 AS BIGINT) AS INT) i, j FROM functional_parquet.iceberg_partition_transforms_zorder) source ON target.i =
