This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8638255e5074f1342dfc452bca39f649a76612d6 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Tue Aug 1 14:57:34 2023 +0200 IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode The Iceberg delete node tries to do mini merge-joins between data records and delete records. This works in BROADCAST mode, and most of the time in PARTITIONED mode as well. Though the Iceberg delete node had the wrong assumption that if the rows in a row batch belong to the same file, and come in ascending order, we rely on the previous delete updating IcebergDeleteState to the next deleted row id and skip the binary search if it's greater than or equal to the current probe row id. When PARTITIONED mode is used, we cannot rely on ascending row order, not even inside row batches, not even when the previous file path is the same as the current one. This is because files with multiple blocks can be processed by multiple hosts in parallel, then the rows are getting hash-exchanged based on their file paths. Then the exchange-receiver at the LHS coalesces the row batches from multiple senders, hence the row IDs being unordered. This patch adds a fix to ignore presumptions and do a binary search when the position-based difference between the current row and previous row is not one, and we are in PARTITIONED mode. Tests: * added e2e tests Change-Id: Ib89a53e812af8c3b8ec5bc27bca0a50dcac5d924 Reviewed-on: http://gerrit.cloudera.org:8080/20295 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/iceberg-delete-node.cc | 27 +++++++++++++++++----- .../functional/functional_schema_template.sql | 4 +--- .../iceberg-v2-read-position-deletes.test | 15 ++++++++++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/be/src/exec/iceberg-delete-node.cc b/be/src/exec/iceberg-delete-node.cc index 04f024ca0..668181f13 100644 --- a/be/src/exec/iceberg-delete-node.cc +++ b/be/src/exec/iceberg-delete-node.cc @@ -324,15 +324,17 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() { } void IcebergDeleteNode::IcebergDeleteState::Update( - impala::StringValue* file_path, int64_t* probe_pos) { + impala::StringValue* file_path, int64_t* next_probe_pos) { DCHECK(builder_ != nullptr); // Making sure the row ids are in ascending order inside a row batch in broadcast mode DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID - || current_probe_pos_ < *probe_pos); - DCHECK(!builder_->IsDistributedMode() || previous_file_path_ == nullptr - || *file_path != *previous_file_path_ || current_probe_pos_ == INVALID_ROW_ID - || current_probe_pos_ < *probe_pos); - current_probe_pos_ = *probe_pos; + || current_probe_pos_ < *next_probe_pos); + bool is_consecutive_pos = false; + if(current_probe_pos_ != INVALID_ROW_ID) { + const int64_t step = *next_probe_pos - current_probe_pos_; + is_consecutive_pos = step == 1; + } + current_probe_pos_ = *next_probe_pos; if (previous_file_path_ != nullptr && (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) { @@ -340,6 +342,19 @@ void IcebergDeleteNode::IcebergDeleteState::Update( if (current_deleted_pos_row_id_ != INVALID_ROW_ID && current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) { UpdateImpl(); + } else if (builder_->IsDistributedMode() && !is_consecutive_pos) { + // In distributed mode (which means PARTITIONED JOIN distribution mode) we cannot + // rely on ascending row order, not even inside row batches, not even when the + // previous file path is the same as the current one. + // This is because files with multiple blocks can be processed by multiple hosts + // in parallel, then the rows are getting hash-exchanged based on their file paths. + // Then the exchange-receiver at the LHS coalesces the row batches from multiple + // senders, hence the row IDs getting unordered. So we are always doing a binary + // search here to find the proper delete row id. + // This won't be a problem with the DIRECTED distribution mode (see IMPALA-12308) + // which will behave similarly to the BROADCAST mode in this regard. + DCHECK_EQ(*file_path, *previous_file_path_); + UpdateImpl(); } } else { auto it = builder_->deleted_rows().find(*file_path); diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 3a5e5b076..7a1e255a2 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -3694,7 +3694,6 @@ ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.de INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(2, 'orc', 1.5, false); ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.default'='parquet'); INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, false); - ==== ---- DATASET functional @@ -3709,8 +3708,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog', 'format-version'='2'); ---- DEPENDENT_LOAD `hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \ -hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice - +hadoop fs -Ddfs.block.size=524288 -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice ==== ---- DATASET functional diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test index c40a968ef..4742defee 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test @@ -672,6 +672,21 @@ SELECT count(*) from iceberg_lineitem_multiblock; bigint ==== ---- QUERY +select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0; +---- RESULTS +0 +---- TYPES +bigint +==== +---- QUERY +SET BATCH_SIZE=2; +select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0; +---- RESULTS +0 +---- TYPES +bigint +==== +---- QUERY SELECT * from iceberg_v2_partitioned_position_deletes; ---- RESULTS 6,'Alex','view',2020-01-01 09:00:00
