This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0334f837048301f5670808711e627384e38dab61
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Mar 1 15:43:30 2024 +0100

    IMPALA-12810: Simplify IcebergDeleteNode and IcebergDeleteBuilder
    
    Now that we have the DIRECTED distribution mode, some parts of
    IcebergDeleteNode and IcebergDeleteBuilder became dead code. It is
    time to simplify the above classes.
    
    IcebergDeleteBuilder and KrpcDataStreamSender now also tolerate
    NULL file paths which are also not an error in the hash join mode.
    
    Change-Id: I3ba02b33433990950b49628f11e732e01ed8a34d
    Reviewed-on: http://gerrit.cloudera.org:8080/21258
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/iceberg-delete-builder.cc              |  48 +++--------
 be/src/exec/iceberg-delete-builder.h               |  19 ++---
 be/src/exec/iceberg-delete-node.cc                 |  62 ++++----------
 be/src/exec/iceberg-delete-node.h                  |   6 +-
 be/src/runtime/krpc-data-stream-sender.cc          |  13 ++-
 testdata/data/README                               |  44 ++++++++++
 ...d3705f6b-370ecfbb00000000_152551971_data.0.parq | Bin 0 -> 617 bytes
 .../data/delete_null_first.parq                    | Bin 0 -> 1578 bytes
 .../data/delete_null_first_and_last.parq           | Bin 0 -> 1587 bytes
 .../data/delete_null_last.parq                     | Bin 0 -> 1578 bytes
 .../data/delete_null_single.parq                   | Bin 0 -> 612 bytes
 .../data/delete_three_nulls.parq                   | Bin 0 -> 629 bytes
 .../data/same_data.0.parq                          | Bin 0 -> 617 bytes
 .../3a813d5e-fc0b-485f-bbba-010972a9f20a-m0.avro   | Bin 0 -> 4197 bytes
 .../e90d28aa-cd17-4655-ad04-aa3711792576-m0.avro   | Bin 0 -> 3879 bytes
 ...222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro | Bin 0 -> 2319 bytes
 .../metadata/v1.metadata.json                      |  89 +++++++++++++++++++++
 .../metadata/version-hint.text                     |   1 +
 .../functional/functional_schema_template.sql      |  15 ++++
 .../datasets/functional/schema_constraints.csv     |   1 +
 tests/query_test/test_iceberg.py                   |  25 ++++++
 21 files changed, 219 insertions(+), 104 deletions(-)

diff --git a/be/src/exec/iceberg-delete-builder.cc 
b/be/src/exec/iceberg-delete-builder.cc
index 307eff420..dd4229601 100644
--- a/be/src/exec/iceberg-delete-builder.cc
+++ b/be/src/exec/iceberg-delete-builder.cc
@@ -141,7 +141,6 @@ Status IcebergDeleteBuilder::CalculateDataFiles() {
   const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
       fragment_it->second->instance_ctx_pbs();
   for (auto ctx : instance_ctx_pbs) {
-    ctx->per_node_scan_ranges().size();
     auto ranges = 
ctx->per_node_scan_ranges().find(delete_scan_node->tnode_->node_id);
     if (ranges == ctx->per_node_scan_ranges().end()) continue;
 
@@ -184,8 +183,6 @@ Status IcebergDeleteBuilder::CalculateDataFiles() {
     }
   }
 
-  is_distributed_mode_ = deleted_rows_.empty();
-
   return Status::OK();
 }
 
@@ -218,13 +215,13 @@ Status IcebergDeleteBuilder::Open(RuntimeState* state) {
 
 Status IcebergDeleteBuilder::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(AddBatch(batch));
+  RETURN_IF_ERROR(AddBatch(state, batch));
   COUNTER_ADD(num_build_rows_, batch->num_rows());
   return Status::OK();
 }
 
-Status IcebergDeleteBuilder::AddBatch(RowBatch* batch) {
-  RETURN_IF_ERROR(ProcessBuildBatch(batch));
+Status IcebergDeleteBuilder::AddBatch(RuntimeState* state, RowBatch* batch) {
+  RETURN_IF_ERROR(ProcessBuildBatch(state, batch));
   return Status::OK();
 }
 
@@ -272,7 +269,8 @@ string IcebergDeleteBuilder::DebugString() const {
   return ss.str();
 }
 
-Status IcebergDeleteBuilder::ProcessBuildBatch(RowBatch* build_batch) {
+Status IcebergDeleteBuilder::ProcessBuildBatch(RuntimeState* state,
+    RowBatch* build_batch) {
   FOREACH_ROW(build_batch, 0, build_batch_iter) {
     TupleRow* build_row = build_batch_iter.Get();
 
@@ -281,39 +279,15 @@ Status IcebergDeleteBuilder::ProcessBuildBatch(RowBatch* 
build_batch) {
 
     const int length = file_path->Len();
     if (UNLIKELY(length == 0)) {
-      return Status(Substitute("NULL found as file_path in delete file"));
+      state->LogError(
+          ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete 
file"));
+      continue;
     }
     int64_t* id = build_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
-    if (is_distributed_mode_) {
-      // Distributed mode, deleted_rows_ is empty after init, only the 
relevant delete
-      // files are sent to this fragment, processing everything
-      auto it = deleted_rows_.find(*file_path);
-      if (it == deleted_rows_.end()) {
-        char* ptr_copy = 
reinterpret_cast<char*>(expr_results_pool_->Allocate(length));
-        if (ptr_copy == nullptr) {
-          return Status("Failed to allocate memory.");
-        }
-
-        memcpy(ptr_copy, file_path->Ptr(), length);
-
-        std::pair<DeleteRowHashTable::iterator, bool> retval =
-            deleted_rows_.emplace(std::piecewise_construct,
-                std::forward_as_tuple(ptr_copy, length), 
std::forward_as_tuple());
-        // emplace succeeded
-        DCHECK(retval.second == true);
-
-        it = retval.first;
-        it->second.reserve(INITIAL_DELETE_VECTOR_CAPACITY);
-      }
-
+    auto it = deleted_rows_.find(*file_path);
+    // deleted_rows_ filled with the relevant data file names, processing only 
those.
+    if (it != deleted_rows_.end()) {
       it->second.emplace_back(*id);
-    } else {
-      // Broadcast mode, deleted_rows_ filled with the relevant data file 
names,
-      // processing only those
-      auto it = deleted_rows_.find(*file_path);
-      if (it != deleted_rows_.end()) {
-        it->second.emplace_back(*id);
-      }
     }
   }
 
diff --git a/be/src/exec/iceberg-delete-builder.h 
b/be/src/exec/iceberg-delete-builder.h
index 3174b60b4..142b832e3 100644
--- a/be/src/exec/iceberg-delete-builder.h
+++ b/be/src/exec/iceberg-delete-builder.h
@@ -71,15 +71,10 @@ class IcebergDeleteBuilderConfig : public JoinBuilderConfig 
{
 /// files, and stores them in unordered_map<file_path, ordered vector of row 
ids> to allow
 /// fast probing.
 ///
-/// Similarly to PartitionedHashJoin, there are 2 modes:
+/// Unlike the PartitionedHashJoin, there is only one mode:
 ///
-///   Broadcast: every fragment receives all data from delete files, filters 
them and
-///   stores only the ones which will be needed to process the assigned data 
files.
-///
-///   Partitioned: Both data and delete files are hashed by the file path. 
This means
-///   there is no need to filter further the delete files, but it can cause 
minor data
-///   skew due to the imbalance in the number of deleted rows corresponding to 
different
-///   data files.
+///   Directed: each fragment will only receive delete records that apply to 
data files
+///   processed by this executor
 ///
 /// Shared Build
 /// ------------
@@ -125,15 +120,14 @@ class IcebergDeleteBuilder : public JoinBuilder {
       std::unordered_map<impala::StringValue, DeleteRowVector, 
StringValueHashWrapper>;
 
   DeleteRowHashTable& deleted_rows() { return deleted_rows_; }
-  bool IsDistributedMode() { return is_distributed_mode_; }
 
  private:
   /// Reads the rows in build_batch and collects them into delete_hash_.
-  Status ProcessBuildBatch(RowBatch* build_batch);
+  Status ProcessBuildBatch(RuntimeState* state, RowBatch* build_batch);
 
   /// Helper method for Send() that does the actual work apart from updating 
the
   /// counters.
-  Status AddBatch(RowBatch* build_batch);
+  Status AddBatch(RuntimeState* state, RowBatch* build_batch);
 
   /// Helper method for FlushFinal() that does the actual work.
   Status FinalizeBuild(RuntimeState* state);
@@ -154,9 +148,6 @@ class IcebergDeleteBuilder : public JoinBuilder {
   int file_path_offset_;
   int pos_offset_;
 
-  // Distribution mode of the node
-  bool is_distributed_mode_;
-
   // Use the length of a cache line as initial capacity
   static constexpr size_t INITIAL_DELETE_VECTOR_CAPACITY = 8;
 
diff --git a/be/src/exec/iceberg-delete-node.cc 
b/be/src/exec/iceberg-delete-node.cc
index 668181f13..8f21bc563 100644
--- a/be/src/exec/iceberg-delete-node.cc
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -187,17 +187,10 @@ Status IcebergDeleteNode::ProcessProbeBatch(RowBatch* 
out_batch) {
   DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_IN_BATCH);
   DCHECK_NE(probe_batch_pos_, -1);
   int rows_added = 0;
-  Status status;
   TPrefetchMode::type prefetch_mode = 
runtime_state_->query_options().prefetch_mode;
   SCOPED_TIMER(probe_timer_);
 
-  rows_added = ProcessProbeBatch(prefetch_mode, out_batch, &status);
-
-  if (UNLIKELY(rows_added < 0)) {
-    DCHECK(!status.ok());
-    return status;
-  }
-  DCHECK(status.ok());
+  rows_added = ProcessProbeBatch(prefetch_mode, out_batch);
   out_batch->CommitRows(rows_added);
   return Status::OK();
 }
@@ -326,35 +319,15 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
 void IcebergDeleteNode::IcebergDeleteState::Update(
     impala::StringValue* file_path, int64_t* next_probe_pos) {
   DCHECK(builder_ != nullptr);
-  // Making sure the row ids are in ascending order inside a row batch in 
broadcast mode
-  DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
-      || current_probe_pos_ < *next_probe_pos);
-  bool is_consecutive_pos = false;
-  if(current_probe_pos_ != INVALID_ROW_ID) {
-    const int64_t step = *next_probe_pos - current_probe_pos_;
-    is_consecutive_pos = step == 1;
-  }
+  // Making sure the row ids are in ascending order inside a row batch in 
DIRECTED mode
+  DCHECK(current_probe_pos_ == INVALID_ROW_ID || current_probe_pos_ < 
*next_probe_pos);
   current_probe_pos_ = *next_probe_pos;
 
-  if (previous_file_path_ != nullptr
-      && (!builder_->IsDistributedMode() || *file_path == 
*previous_file_path_)) {
-    // Fast path if the file did not change, no need to hash again
+  if (previous_file_path_ != nullptr) {
+    // We already have a file path, no need to hash again
     if (current_deleted_pos_row_id_ != INVALID_ROW_ID
         && current_probe_pos_ > 
(*current_delete_row_)[current_deleted_pos_row_id_]) {
       UpdateImpl();
-    } else if (builder_->IsDistributedMode() && !is_consecutive_pos) {
-      // In distributed mode (which means PARTITIONED JOIN distribution mode) 
we cannot
-      // rely on ascending row order, not even inside row batches, not even 
when the
-      // previous file path is the same as the current one.
-      // This is because files with multiple blocks can be processed by 
multiple hosts
-      // in parallel, then the rows are getting hash-exchanged based on their 
file paths.
-      // Then the exchange-receiver at the LHS coalesces the row batches from 
multiple
-      // senders, hence the row IDs getting unordered. So we are always doing 
a binary
-      // search here to find the proper delete row id.
-      // This won't be a problem with the DIRECTED distribution mode (see 
IMPALA-12308)
-      // which will behave similarly to the BROADCAST mode in this regard.
-      DCHECK_EQ(*file_path, *previous_file_path_);
-      UpdateImpl();
     }
   } else {
     auto it = builder_->deleted_rows().find(*file_path);
@@ -389,7 +362,7 @@ void IcebergDeleteNode::IcebergDeleteState::Delete() {
 }
 
 bool IcebergDeleteNode::IcebergDeleteState::NeedCheck() const {
-  return builder_->IsDistributedMode() || current_deleted_pos_row_id_ != 
INVALID_ROW_ID
+  return current_deleted_pos_row_id_ != INVALID_ROW_ID
       || current_probe_pos_ == INVALID_ROW_ID;
 }
 
@@ -407,7 +380,7 @@ void IcebergDeleteNode::IcebergDeleteState::Reset() {
 }
 
 bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* 
status) {
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
   DCHECK(current_probe_row_ != nullptr);
   TupleRow* out_row = out_batch_iterator->Get();
   if (!iceberg_delete_state_.IsDeleted()) {
@@ -423,7 +396,7 @@ bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
 }
 
 bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRowNoCheck(
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* 
status) {
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
   DCHECK(current_probe_row_ != nullptr);
   TupleRow* out_row = out_batch_iterator->Get();
   out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
@@ -435,7 +408,7 @@ bool IR_ALWAYS_INLINE 
IcebergDeleteNode::ProcessProbeRowNoCheck(
 }
 
 int IcebergDeleteNode::ProcessProbeBatch(
-    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, Status* 
__restrict__ status) {
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch) {
   DCHECK(!out_batch->AtCapacity());
   DCHECK_GE(probe_batch_pos_, 0);
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
@@ -444,7 +417,7 @@ int IcebergDeleteNode::ProcessProbeBatch(
   RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), 
probe_batch_pos_);
   int remaining_capacity = max_rows;
 
-  while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0 && 
status->ok()) {
+  while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
     current_probe_row_ = probe_batch_iterator.Get();
     if (iceberg_delete_state_.NeedCheck()) {
       impala::StringValue* file_path =
@@ -453,12 +426,12 @@ int IcebergDeleteNode::ProcessProbeBatch(
           current_probe_row_->GetTuple(0)->GetBigIntSlot(pos_offset_);
 
       iceberg_delete_state_.Update(file_path, current_probe_pos);
-      if (!ProcessProbeRow(&out_batch_iterator, &remaining_capacity, status)) {
-        if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
+      if (!ProcessProbeRow(&out_batch_iterator, &remaining_capacity)) {
+        DCHECK_EQ(remaining_capacity, 0);
       }
     } else {
-      if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity, 
status)) {
-        if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
+      if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity)) {
+        DCHECK_EQ(remaining_capacity, 0);
       }
     }
 
@@ -468,12 +441,7 @@ int IcebergDeleteNode::ProcessProbeBatch(
     probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0));
   }
 
-  int num_rows_added;
-  if (LIKELY(status->ok())) {
-    num_rows_added = max_rows - remaining_capacity;
-  } else {
-    num_rows_added = -1;
-  }
+  int num_rows_added = max_rows - remaining_capacity;
 
   // Clear state as ascending order of row ids are not guaranteed between 
probe row
   // batches
diff --git a/be/src/exec/iceberg-delete-node.h 
b/be/src/exec/iceberg-delete-node.h
index 0775f1cad..a50d7d837 100644
--- a/be/src/exec/iceberg-delete-node.h
+++ b/be/src/exec/iceberg-delete-node.h
@@ -116,18 +116,18 @@ class IcebergDeleteNode : public BlockingJoinNode {
   /// Probes 'current_probe_row_' against the hash tables and append outputs
   /// to output batch.
   bool inline ProcessProbeRow(RowBatch::Iterator* out_batch_iterator,
-      int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
+      int* remaining_capacity) WARN_UNUSED_RESULT;
 
   /// Append outputs to output batch.
   bool inline ProcessProbeRowNoCheck(RowBatch::Iterator* out_batch_iterator,
-      int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
+      int* remaining_capacity) WARN_UNUSED_RESULT;
 
   /// Process probe rows from probe_batch_. Returns either if out_batch is 
full or
   /// probe_batch_ is entirely consumed.
   /// Returns the number of rows added to out_batch; -1 on error (and *status 
will
   /// be set). This function doesn't commit rows to the output batch so it's 
the caller's
   /// responsibility to do so.
-  int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, Status* 
status);
+  int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch);
 
   /// Wrapper that ProcessProbeBatch() and commits the rows to 'out_batch' on 
success.
   Status ProcessProbeBatch(RowBatch* out_batch);
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 45d3714fc..6bacf7843 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -1107,7 +1107,8 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
       if (filename_value_ss.ptr == prev_filename_ptr) {
         // If the filename pointer is the same as the previous one then we can 
instantly
         // send the row to the same channels as the previous row.
-        DCHECK(skipped_prev_row || !prev_channels.empty());
+        DCHECK(skipped_prev_row || !prev_channels.empty() ||
+            (filename_value_ss.len == 0 && prev_channels.empty()));
         for (Channel* ch : prev_channels) 
RETURN_IF_ERROR(ch->AddRow(tuple_row));
         continue;
       }
@@ -1121,8 +1122,14 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
         // files that remains in the new snapshot.
         // Another use-case is table sampling where we read only a subset of 
the data
         // files.
-        VLOG(3) << "Row from delete file refers to a non-existing data file: " 
<<
-            filename;
+        // A third case is when the delete record is invalid.
+        if (UNLIKELY(filename_value_ss.len == 0)) {
+          state->LogError(
+            ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete 
file"));
+        } else {
+          VLOG(3) << "Row from delete file refers to a non-existing data file: 
" <<
+              filename;
+        }
         skipped_prev_row = true;
         continue;
       }
diff --git a/testdata/data/README b/testdata/data/README
index 0eea4842f..2e1970cf5 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -1130,6 +1130,50 @@ iceberg_v2_equality_delete_schema_evolution:
 4: Update a row with Nifi where i=4 to the following:
   (44, 2024-03-21, "str4", 4444)
 
+iceberg_v2_null_delete_record:
+1) Created the table via Impala and added some records to it.
+  CREATE TABLE iceberg_v2_null_delete_record(i INT, j INT)
+  STORED BY ICEBERG;
+  INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);
+  INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);
+
+  (We need at least 2 data files to use DIRECTED mode in KrpcDataStreamSender)
+
+2) Created the following temporary table:
+  CREATE TABLE iceberg_v2_null_delete_record_pos_delete (file_path STRING, pos 
BIGINT)
+  STORED BY ICEBERG;
+
+  Manually rewrote the metadata JSON file of this table, so the schema 
elements have the
+  following field ids (there are two places where I had to modify the schemas):
+
+    file_path : 2147483546
+    pos       : 2147483545
+
+3) Inserted data files into iceberg_v2_null_delete_record_pos_delete:
+
+  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
+  (NULL, 0);
+
+  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
+  ('<data file path of iceberg_v2_null_delete_record>', 0), (NULL, 3);
+
+  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
+  (NULL, 2), ('<data file path of iceberg_v2_null_delete_record>', 0);
+
+  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
+  (NULL, 0), (NULL, 1), (NULL, 2);
+
+  The written Parquet files have the schema of position delete files (with the
+  correct Iceberg field ids)
+
+4) Copied iceberg_v2_null_delete_record to the local filesystem and applied
+  the following modifications:
+
+  * added the Parquet files from iceberg_v2_null_delete_record_pos_delete to
+    the /data directory
+  * manually edited the metadata JSON, and the manifest and manifest list 
files to
+    register the delete files in the table
+
 arrays_big.parq:
 Generated with RandomNestedDataGenerator.java from the following schema:
 {
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/6348b186d3705f6b-370ecfbb00000000_152551971_data.0.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/6348b186d3705f6b-370ecfbb00000000_152551971_data.0.parq
new file mode 100644
index 000000000..1e20490af
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/6348b186d3705f6b-370ecfbb00000000_152551971_data.0.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first.parq
new file mode 100644
index 000000000..6928177e1
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first_and_last.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first_and_last.parq
new file mode 100644
index 000000000..d72986c28
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_first_and_last.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_last.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_last.parq
new file mode 100644
index 000000000..4af229fdb
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_last.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_single.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_single.parq
new file mode 100644
index 000000000..927d2fa79
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_null_single.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_three_nulls.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_three_nulls.parq
new file mode 100644
index 000000000..fbc984f47
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/delete_three_nulls.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/same_data.0.parq
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/same_data.0.parq
new file mode 100644
index 000000000..1e20490af
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/data/same_data.0.parq
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/3a813d5e-fc0b-485f-bbba-010972a9f20a-m0.avro
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/3a813d5e-fc0b-485f-bbba-010972a9f20a-m0.avro
new file mode 100644
index 000000000..54159a43a
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/3a813d5e-fc0b-485f-bbba-010972a9f20a-m0.avro
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/e90d28aa-cd17-4655-ad04-aa3711792576-m0.avro
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/e90d28aa-cd17-4655-ad04-aa3711792576-m0.avro
new file mode 100644
index 000000000..d0b55bf7c
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/e90d28aa-cd17-4655-ad04-aa3711792576-m0.avro
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/snap-5852039568708655222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/snap-5852039568708655222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro
new file mode 100644
index 000000000..a2b5cce21
Binary files /dev/null and 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/snap-5852039568708655222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro
 differ
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/v1.metadata.json
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/v1.metadata.json
new file mode 100644
index 000000000..0e27f8dba
--- /dev/null
+++ 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/v1.metadata.json
@@ -0,0 +1,89 @@
+{
+  "format-version" : 2,
+  "table-uuid" : "a4e56638-534e-41f1-848a-921f8a3364e2",
+  "location" : 
"/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record",
+  "last-sequence-number" : 2,
+  "last-updated-ms" : 1712662161442,
+  "last-column-id" : 2,
+  "current-schema-id" : 0,
+  "schemas" : [ {
+    "type" : "struct",
+    "schema-id" : 0,
+    "fields" : [ {
+      "id" : 1,
+      "name" : "i",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "j",
+      "required" : false,
+      "type" : "int"
+    } ]
+  } ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ ]
+  } ],
+  "last-partition-id" : 999,
+  "default-sort-order-id" : 0,
+  "sort-orders" : [ {
+    "order-id" : 0,
+    "fields" : [ ]
+  } ],
+  "properties" : {
+    "engine.hive.enabled" : "true",
+    "write.merge.mode" : "merge-on-read",
+    "write.format.default" : "parquet",
+    "write.delete.mode" : "merge-on-read",
+    "iceberg.catalog_location" : "/test-warehouse/iceberg_test/hadoop_catalog",
+    "OBJCAPABILITIES" : "EXTREAD,EXTWRITE",
+    "write.update.mode" : "merge-on-read",
+    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+    "iceberg.catalog" : "hadoop.catalog",
+    "iceberg.table_identifier" : "ice.iceberg_v2_null_delete_record"
+  },
+  "current-snapshot-id" : 5852039568708655222,
+  "refs" : {
+    "main" : {
+      "snapshot-id" : 5852039568708655222,
+      "type" : "branch"
+    }
+  },
+  "snapshots" : [ {
+    "sequence-number" : 1,
+    "snapshot-id" : 5852039568708655222,
+    "parent-snapshot-id" : 1905028041526857588,
+    "timestamp-ms" : 1712662161442,
+    "summary" : {
+      "operation" : "overwrite",
+      "added-position-delete-files" : "1",
+      "added-delete-files" : "1",
+      "added-files-size" : "1578",
+      "added-position-deletes" : "2",
+      "changed-partition-count" : "1",
+      "total-records" : "4",
+      "total-files-size" : "2195",
+      "total-data-files" : "1",
+      "total-delete-files" : "1",
+      "total-position-deletes" : "2",
+      "total-equality-deletes" : "0"
+    },
+    "manifest-list" : 
"/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/snap-5852039568708655222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro",
+    "schema-id" : 0
+  } ],
+  "statistics" : [ ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1712662151105,
+    "snapshot-id" : 1905028041526857588
+  }, {
+    "timestamp-ms" : 1712662161442,
+    "snapshot-id" : 5852039568708655222
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1712662124268,
+    "metadata-file" : 
"/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/v1.metadata.json"
+  }
+  ]
+}
diff --git 
a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/version-hint.text
 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/version-hint.text
new file mode 100644
index 000000000..56a6051ca
--- /dev/null
+++ 
b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/version-hint.text
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 8567bb94f..0c5fab271 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3954,6 +3954,21 @@ hadoop fs -put -f 
${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/i
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+iceberg_v2_null_delete_record
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+              
'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
+              'iceberg.table_identifier'='ice.iceberg_v2_null_delete_record',
+              'format-version'='2');
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
+hadoop fs -put -f 
${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record
 /test-warehouse/iceberg_test/hadoop_catalog/ice
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 mv1_alltypes_jointbl
 ---- HIVE_MAJOR_VERSION
 3
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index da430313a..f364a8ead 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -92,6 +92,7 @@ table_name:iceberg_v2_delete_equality_multi_eq_ids, 
constraint:restrict_to, tabl
 table_name:iceberg_v2_delete_pos_and_multi_eq_ids, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_v2_no_deletes, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_v2_no_deletes_orc, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:iceberg_v2_null_delete_record, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_v2_positional_update_all_rows, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_v2_positional_delete_all_rows, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_v2_positional_delete_all_rows_orc, constraint:restrict_to, 
table_format:parquet/none/none
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 7a23cc477..6e4acd0c8 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1449,6 +1449,31 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_read_position_deletes(self, vector):
     self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector)
 
+  @SkipIfDockerizedCluster.internal_hostname
+  @SkipIf.hardcoded_uris
+  def test_read_null_delete_records(self, vector):
+    expected_error = 'NULL found as file_path in delete file'
+    query_options = vector.get_value('exec_option')
+    v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1
+    result = self.execute_query(
+        'select * from functional_parquet.iceberg_v2_null_delete_record', 
query_options)
+    assert len(result.data) == 6
+    errors = result.log
+    print(errors)
+    assert expected_error in errors or v2_op_disabled
+    result = self.execute_query(
+        'select count(*) from 
functional_parquet.iceberg_v2_null_delete_record',
+        query_options)
+    assert result.data == ['6']
+    errors = result.log
+    assert expected_error in errors or v2_op_disabled
+    result = self.execute_query(
+        """select * from functional_parquet.iceberg_v2_null_delete_record
+           where j < 3""", query_options)
+    assert sorted(result.data) == ['1\t1', '2\t2']
+    errors = result.log
+    assert expected_error in errors or v2_op_disabled
+
   @SkipIfDockerizedCluster.internal_hostname
   @SkipIf.hardcoded_uris
   def test_read_equality_deletes(self, vector):

Reply via email to