This is an automated email from the ASF dual-hosted git repository.

prozsa pushed a commit to branch branch-4.5.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7eeb89cb33e18308a322060b923fc75fe175984d
Author: Zoltan Borok-Nagy <borokna...@cloudera.com>
AuthorDate: Wed Feb 19 14:11:19 2025 +0100

    IMPALA-13768: Redundant Iceberg delete records are shuffled around which 
cause error "Invalid file path arrived at builder"
    
    IcebergDeleteBuilder assumes that it should only receive delete
    records for paths of data files that are scheduled for its
    corresponding SCAN operator.
    
    It is not true when any of the following happens:
    * number of output channels in sender is 1
      (currently no DIRECTED mode, no filtering)
    * hit bug in DIRECTED mode, see below
    * single node plan is being used (no DIRECTED mode, no filtering)
    
    With this patch, KrpcDataStreamSender::Send() will use DIRECTED mode
    even if number of output channels is 1. It also fixes the bug in
    DIRECTED mode (which was due to an unused variable 'skipped_prev_row')
    and simplified the logic a bit.
    
    The patch also relaxes the assumption in IcebergDeleteBuilder, i.e.
    only return error for dangling delete records when we are in a
    distributed plan where we can assume DIRECTED distribution mode of
    position delete records.
    
    Testing
     * added e2e tests
    
    Change-Id: I695c919c9a74edec768e413a02b2ef7dbfa0d6a5
    Reviewed-on: http://gerrit.cloudera.org:8080/22500
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/exec/iceberg-delete-builder.cc              | 21 ++++++++-------
 be/src/runtime/krpc-data-stream-sender.cc          | 22 +++++++---------
 .../QueryTest/iceberg-delete-partitioned.test      | 30 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/iceberg-delete-builder.cc 
b/be/src/exec/iceberg-delete-builder.cc
index 4692aca0e..fc4833ace 100644
--- a/be/src/exec/iceberg-delete-builder.cc
+++ b/be/src/exec/iceberg-delete-builder.cc
@@ -261,16 +261,19 @@ Status IcebergDeleteBuilder::AddToDeletedRows(const 
StringValue& path,
   } else if (path.Len() == 0) {
     return Status("NULL found as file_path in delete file");
   } else {
-    // 'deleted_rows_' should contain all data file paths that this join
-    // processes.
-    stringstream ss;
-    ss << "Invalid file path arrived at builder: " << path << " Paths 
expected: [";
-    for (auto& [vec_path, unused] : deleted_rows_) {
-      ss << vec_path << ", ";
+    // We received a path of a data file that is not scheduled for this 
fragment.
+    if (is_separate_build_) {
+      // When the build is in its own fragment we assume that delete records 
are filtered
+      // in the sender by DIRECTED distribution mode. Therefore we should 
never see
+      // dangling delete records.
+      stringstream ss;
+      ss << "Invalid file path arrived at builder: " << path << " Paths 
expected: [";
+      for (auto& [vec_path, unused] : deleted_rows_) {
+        ss << vec_path << ", ";
+      }
+      ss << "]";
+      return Status(ss.str());
     }
-    ss << "]";
-    DCHECK(false) << ss.str();
-    return Status(ss.str());
   }
   return Status::OK();
 }
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 6a41018c2..127ced2a1 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -1133,7 +1133,8 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     }
     // At this point no RPCs can still refer to the old in_flight_batch_.
     in_flight_batch_.swap(serialization_batch_);
-  } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 
1) {
+  } else if (partition_type_ == TPartitionType::RANDOM ||
+      (channels_.size() == 1 && partition_type_ != TPartitionType::DIRECTED)) {
     // Round-robin batches among channels. Wait for the current channel to 
finish its
     // rpc before overwriting its batch.
     Channel* current_channel = channels_[current_channel_idx_].get();
@@ -1171,7 +1172,6 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     const int num_rows = batch->num_rows();
     char* prev_filename_ptr = nullptr;
     vector<IcebergPositionDeleteChannel*> prev_channels;
-    bool skipped_prev_row = false;
     for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
       DCHECK_EQ(batch->num_tuples_per_row(), 1);
       TupleRow* tuple_row = batch->GetRow(row_idx);
@@ -1182,24 +1182,22 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
       if (filename_value_ss.ptr == prev_filename_ptr) {
         // If the filename pointer is the same as the previous one then we can 
instantly
         // send the row to the same channels as the previous row.
-        DCHECK(skipped_prev_row || !prev_channels.empty() ||
-            (filename_value_ss.len == 0 && prev_channels.empty()));
         for (IcebergPositionDeleteChannel* ch : prev_channels) {
           RETURN_IF_ERROR(ch->AddRow(tuple_row));
         }
         continue;
       }
+      prev_channels.clear();
       prev_filename_ptr = filename_value_ss.ptr;
       string filename(filename_value_ss.ptr, filename_value_ss.len);
 
       const auto filepath_to_hosts_it = filepath_to_hosts_.find(filename);
       if (filepath_to_hosts_it == filepath_to_hosts_.end()) {
-        // This can happen when e.g. compaction removed some data files from a 
snapshot
-        // but a delete file referencing them remained because it references 
other data
-        // files that remains in the new snapshot.
-        // Another use-case is table sampling where we read only a subset of 
the data
-        // files.
-        // A third case is when the delete record is invalid.
+        // This can happen due to file pruning, or when compaction removed 
some data
+        // files from a snapshot but a delete file referencing them remained 
because
+        // it references other data files that are still present in the new 
snapshot.
+        // Another case is table sampling where we read only a subset of the 
data files.
+        // Or, when simply the delete record is invalid.
         if (UNLIKELY(filename_value_ss.len == 0)) {
           state->LogError(
             ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete 
file"));
@@ -1207,12 +1205,10 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
           VLOG(3) << "Row from delete file refers to a non-existing data file: 
" <<
               filename;
         }
-        skipped_prev_row = true;
+        DCHECK(prev_channels.empty());
         continue;
       }
-      skipped_prev_row = false;
 
-      prev_channels.clear();
       for (const NetworkAddressPB& host_addr : filepath_to_hosts_it->second) {
         const auto channel_map_it = host_to_channel_.find(host_addr);
         if (channel_map_it == host_to_channel_.end()) {
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
index 0e8d8e973..b81fa89c8 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
@@ -803,3 +803,33 @@ delete from ice_alltypes_part_v2 where i=1;
 ---- TYPES
 INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
 ====
+---- QUERY
+# Create table to test IMPALA-13768. It will have multiple partitions, with
+# multiple data files in each partition, where min/max ranges of column 'bi'
+# is disjunct for data files (in same partition).
+create table ice_invalid_deletes (bi bigint, year int)
+partitioned by spec (year)
+stored as iceberg tblproperties ('format-version'='2');
+insert into ice_invalid_deletes
+    select bigint_col, year from functional.alltypes where month = 10;
+with v as (select max(bi) as max_bi from ice_invalid_deletes)
+insert into ice_invalid_deletes select bi + v.max_bi, year from v, 
ice_invalid_deletes;
+delete from ice_invalid_deletes where bi % 11 = 0;
+====
+---- QUERY
+# Scan single data file (single channel in EXCHANGE sender) with delete file
+# that have references to other data files.
+select count(*) from ice_invalid_deletes where year=2010 and bi = 180;
+---- RESULTS
+31
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Scan single data file per partition with delete files that reference other 
data files.
+select count(*) from ice_invalid_deletes where year>2000 and bi = 180;
+---- RESULTS
+62
+---- TYPES
+BIGINT
+====

Reply via email to