This is an automated email from the ASF dual-hosted git repository. prozsa pushed a commit to branch branch-4.5.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7eeb89cb33e18308a322060b923fc75fe175984d Author: Zoltan Borok-Nagy <borokna...@cloudera.com> AuthorDate: Wed Feb 19 14:11:19 2025 +0100 IMPALA-13768: Redundant Iceberg delete records are shuffled around which cause error "Invalid file path arrived at builder" IcebergDeleteBuilder assumes that it should only receive delete records for paths of data files that are scheduled for its corresponding SCAN operator. It is not true when any of the following happens: * number of output channels in sender is 1 (currently no DIRECTED mode, no filtering) * hit bug in DIRECTED mode, see below * single node plan is being used (no DIRECTED mode, no filtering) With this patch, KrpcDataStreamSender::Send() will use DIRECTED mode even if number of output channels is 1. It also fixes the bug in DIRECTED mode (which was due to an unused variable 'skipped_prev_row') and simplified the logic a bit. The patch also relaxes the assumption in IcebergDeleteBuilder, i.e. only return error for dangling delete records when we are in a distributed plan where we can assume DIRECTED distribution mode of position delete records. Testing * added e2e tests Change-Id: I695c919c9a74edec768e413a02b2ef7dbfa0d6a5 Reviewed-on: http://gerrit.cloudera.org:8080/22500 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/iceberg-delete-builder.cc | 21 ++++++++------- be/src/runtime/krpc-data-stream-sender.cc | 22 +++++++--------- .../QueryTest/iceberg-delete-partitioned.test | 30 ++++++++++++++++++++++ 3 files changed, 51 insertions(+), 22 deletions(-) diff --git a/be/src/exec/iceberg-delete-builder.cc b/be/src/exec/iceberg-delete-builder.cc index 4692aca0e..fc4833ace 100644 --- a/be/src/exec/iceberg-delete-builder.cc +++ b/be/src/exec/iceberg-delete-builder.cc @@ -261,16 +261,19 @@ Status IcebergDeleteBuilder::AddToDeletedRows(const StringValue& path, } else if (path.Len() == 0) { return Status("NULL found as file_path in delete file"); } else { - // 'deleted_rows_' should contain all data file paths that this join - // processes. - stringstream ss; - ss << "Invalid file path arrived at builder: " << path << " Paths expected: ["; - for (auto& [vec_path, unused] : deleted_rows_) { - ss << vec_path << ", "; + // We received a path of a data file that is not scheduled for this fragment. + if (is_separate_build_) { + // When the build is in its own fragment we assume that delete records are filtered + // in the sender by DIRECTED distribution mode. Therefore we should never see + // dangling delete records. + stringstream ss; + ss << "Invalid file path arrived at builder: " << path << " Paths expected: ["; + for (auto& [vec_path, unused] : deleted_rows_) { + ss << vec_path << ", "; + } + ss << "]"; + return Status(ss.str()); } - ss << "]"; - DCHECK(false) << ss.str(); - return Status(ss.str()); } return Status::OK(); } diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 6a41018c2..127ced2a1 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -1133,7 +1133,8 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { } // At this point no RPCs can still refer to the old in_flight_batch_. in_flight_batch_.swap(serialization_batch_); - } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) { + } else if (partition_type_ == TPartitionType::RANDOM || + (channels_.size() == 1 && partition_type_ != TPartitionType::DIRECTED)) { // Round-robin batches among channels. Wait for the current channel to finish its // rpc before overwriting its batch. Channel* current_channel = channels_[current_channel_idx_].get(); @@ -1171,7 +1172,6 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { const int num_rows = batch->num_rows(); char* prev_filename_ptr = nullptr; vector<IcebergPositionDeleteChannel*> prev_channels; - bool skipped_prev_row = false; for (int row_idx = 0; row_idx < num_rows; ++row_idx) { DCHECK_EQ(batch->num_tuples_per_row(), 1); TupleRow* tuple_row = batch->GetRow(row_idx); @@ -1182,24 +1182,22 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { if (filename_value_ss.ptr == prev_filename_ptr) { // If the filename pointer is the same as the previous one then we can instantly // send the row to the same channels as the previous row. - DCHECK(skipped_prev_row || !prev_channels.empty() || - (filename_value_ss.len == 0 && prev_channels.empty())); for (IcebergPositionDeleteChannel* ch : prev_channels) { RETURN_IF_ERROR(ch->AddRow(tuple_row)); } continue; } + prev_channels.clear(); prev_filename_ptr = filename_value_ss.ptr; string filename(filename_value_ss.ptr, filename_value_ss.len); const auto filepath_to_hosts_it = filepath_to_hosts_.find(filename); if (filepath_to_hosts_it == filepath_to_hosts_.end()) { - // This can happen when e.g. compaction removed some data files from a snapshot - // but a delete file referencing them remained because it references other data - // files that remains in the new snapshot. - // Another use-case is table sampling where we read only a subset of the data - // files. - // A third case is when the delete record is invalid. + // This can happen due to file pruning, or when compaction removed some data + // files from a snapshot but a delete file referencing them remained because + // it references other data files that are still present in the new snapshot. + // Another case is table sampling where we read only a subset of the data files. + // Or, when simply the delete record is invalid. if (UNLIKELY(filename_value_ss.len == 0)) { state->LogError( ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete file")); @@ -1207,12 +1205,10 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { VLOG(3) << "Row from delete file refers to a non-existing data file: " << filename; } - skipped_prev_row = true; + DCHECK(prev_channels.empty()); continue; } - skipped_prev_row = false; - prev_channels.clear(); for (const NetworkAddressPB& host_addr : filepath_to_hosts_it->second) { const auto channel_map_it = host_to_channel_.find(host_addr); if (channel_map_it == host_to_channel_.end()) { diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test index 0e8d8e973..b81fa89c8 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test @@ -803,3 +803,33 @@ delete from ice_alltypes_part_v2 where i=1; ---- TYPES INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING ==== +---- QUERY +# Create table to test IMPALA-13768. It will have multiple partitions, with +# multiple data files in each partition, where min/max ranges of column 'bi' +# is disjunct for data files (in same partition). +create table ice_invalid_deletes (bi bigint, year int) +partitioned by spec (year) +stored as iceberg tblproperties ('format-version'='2'); +insert into ice_invalid_deletes + select bigint_col, year from functional.alltypes where month = 10; +with v as (select max(bi) as max_bi from ice_invalid_deletes) +insert into ice_invalid_deletes select bi + v.max_bi, year from v, ice_invalid_deletes; +delete from ice_invalid_deletes where bi % 11 = 0; +==== +---- QUERY +# Scan single data file (single channel in EXCHANGE sender) with delete file +# that have references to other data files. +select count(*) from ice_invalid_deletes where year=2010 and bi = 180; +---- RESULTS +31 +---- TYPES +BIGINT +==== +---- QUERY +# Scan single data file per partition with delete files that reference other data files. +select count(*) from ice_invalid_deletes where year>2000 and bi = 180; +---- RESULTS +62 +---- TYPES +BIGINT +====