This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6573ebb410 [fix](iceberg) read the primary key columns if hasing 
equality delete (#34835)
a6573ebb410 is described below

commit a6573ebb41081d8e81110d8a3de0933ec9acb428
Author: Ashin Gau <[email protected]>
AuthorDate: Wed May 15 11:36:42 2024 +0800

    [fix](iceberg) read the primary key columns if hasing equality delete 
(#34835)
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 40 ++++++++++++++++++++--
 be/src/vec/exec/format/table/iceberg_reader.h      |  7 ++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  4 ---
 .../iceberg/iceberg_equality_delete.out            | 24 +++++++++++++
 .../iceberg/iceberg_equality_delete.groovy         |  8 +++++
 5 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index f9de386e595..8e4cdbd311a 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -115,6 +115,7 @@ Status IcebergTableReader::get_next_block(Block* block, 
size_t* read_rows, bool*
 
         return Status::OK();
     }
+    RETURN_IF_ERROR(_expand_block_if_need(block));
 
     // To support iceberg schema evolution. We change the column name in block 
to
     // make it match with the column name in parquet file before reading data. 
and
@@ -130,7 +131,7 @@ Status IcebergTableReader::get_next_block(Block* block, 
size_t* read_rows, bool*
         block->initialize_index_by_name();
     }
 
-    auto res = _file_format_reader->get_next_block(block, read_rows, eof);
+    RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, 
eof));
     // Set the name back to table column name before return this block.
     if (_has_schema_change) {
         for (int i = 0; i < block->columns(); i++) {
@@ -147,7 +148,7 @@ Status IcebergTableReader::get_next_block(Block* block, 
size_t* read_rows, bool*
         RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
         *read_rows = block->rows();
     }
-    return res;
+    return _shrink_block_if_need(block);
 }
 
 Status IcebergTableReader::set_fill_columns(
@@ -253,6 +254,21 @@ Status IcebergTableReader::_equality_delete_base(
             }
         }
     }
+    for (int i = 0; i < equality_delete_col_names.size(); ++i) {
+        const std::string& delete_col = equality_delete_col_names[i];
+        if (std::find(_all_required_col_names.begin(), 
_all_required_col_names.end(), delete_col) ==
+            _all_required_col_names.end()) {
+            _expand_col_names.emplace_back(delete_col);
+            DataTypePtr data_type = 
DataTypeFactory::instance().create_data_type(
+                    equality_delete_col_types[i], true);
+            MutableColumnPtr data_column = data_type->create_column();
+            _expand_columns.emplace_back(
+                    ColumnWithTypeAndName(std::move(data_column), data_type, 
delete_col));
+        }
+    }
+    for (const std::string& delete_col : _expand_col_names) {
+        _all_required_col_names.emplace_back(delete_col);
+    }
     _equality_delete_impl = 
EqualityDeleteBase::get_delete_impl(&_equality_delete_block);
     return _equality_delete_impl->init(_profile);
 }
@@ -269,6 +285,24 @@ void IcebergTableReader::_generate_equality_delete_block(
     }
 }
 
+Status IcebergTableReader::_expand_block_if_need(Block* block) {
+    for (auto& col : _expand_columns) {
+        col.column->assume_mutable()->clear();
+        if (block->try_get_by_name(col.name)) {
+            return Status::InternalError("Wrong expand column '{}'", col.name);
+        }
+        block->insert(col);
+    }
+    return Status::OK();
+}
+
+Status IcebergTableReader::_shrink_block_if_need(Block* block) {
+    for (const std::string& expand_col : _expand_col_names) {
+        block->erase(expand_col);
+    }
+    return Status::OK();
+}
+
 Status IcebergTableReader::_position_delete_base(
         const std::vector<TIcebergDeleteFileDesc>& delete_files) {
     std::string data_file_path = _range.path;
@@ -534,6 +568,7 @@ Status IcebergParquetReader::init_reader(
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
     parquet_reader->iceberg_sanitize(_all_required_col_names);
+    RETURN_IF_ERROR(init_row_filters(_range));
     return parquet_reader->init_reader(
             _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
             conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
@@ -606,6 +641,7 @@ Status IcebergOrcReader::init_reader(
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
+    RETURN_IF_ERROR(init_row_filters(_range));
     return orc_reader->init_reader(&_all_required_col_names, 
&_new_colname_to_value_range,
                                    conjuncts, false, tuple_descriptor, 
row_descriptor,
                                    not_single_slot_filter_conjuncts, 
slot_id_to_filter_conjuncts);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 81c5613d681..cda50015911 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -137,6 +137,10 @@ protected:
     void _generate_equality_delete_block(
             Block* block, const std::vector<std::string>& 
equality_delete_col_names,
             const std::vector<TypeDescriptor>& equality_delete_col_types);
+    // Equality delete should read the primary columns. Add the missing columns
+    Status _expand_block_if_need(Block* block);
+    // Remove the added delete columns
+    Status _shrink_block_if_need(Block* block);
 
     RuntimeProfile* _profile;
     RuntimeState* _state;
@@ -161,6 +165,9 @@ protected:
     std::vector<std::string> _all_required_col_names;
     // col names in table but not in parquet,orc file
     std::vector<std::string> _not_in_file_col_names;
+    // equality delete should read the primary columns
+    std::vector<std::string> _expand_col_names;
+    std::vector<ColumnWithTypeAndName> _expand_columns;
 
     io::IOContext* _io_ctx;
     bool _has_schema_change = false;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 15e250b3871..e518e6068c7 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -850,8 +850,6 @@ Status VFileScanner::_get_next_reader() {
                         _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
                         _col_name_to_slot_id, 
&_not_single_slot_filter_conjuncts,
                         &_slot_id_to_filter_conjuncts);
-
-                RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader = std::move(iceberg_reader);
             } else {
                 std::vector<std::string> place_holder;
@@ -903,8 +901,6 @@ Status VFileScanner::_get_next_reader() {
                         _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
                         _col_name_to_slot_id, 
&_not_single_slot_filter_conjuncts,
                         &_slot_id_to_filter_conjuncts);
-
-                RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader = std::move(iceberg_reader);
             } else {
                 init_status = orc_reader->init_reader(
diff --git 
a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out 
b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
index 2f7f599929b..44cf1b7b2bf 100644
--- a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
+++ b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
@@ -41,6 +41,18 @@
 19     Customer#000000019      uc,3bHIx84H,wdrmLOjVsiqXCq2tr   18      
28-396-526-5053 8914.71 HOUSEHOLD        nag. furiously careful packages are 
slyly at the accounts. furiously regular in
 20     Customer#000000020      JrPk8Pqplj4Ne   22      32-957-234-8742 7603.40 
FURNITURE       g alongside of the special excuses-- fluffily enticing packages 
wake 
 
+-- !count1 --
+19
+
+-- !count1_orc --
+19
+
+-- !max1 --
+update-comment-2
+
+-- !max1_orc --
+update-comment-2
+
 -- !one_delete_column --
 1      Customer#000000001      IVhzIApeRb ot,c,E       151     update-phone-1  
711.56  BUILDING        update-comment-1
 2      Customer#000000002      XSTf4,NCwDVaWNe6tEgvwfmRchLXak  13      
23-768-687-3665 121.65  AUTOMOBILE      l accounts. blithely ironic theodolites 
integrate boldly: caref
@@ -83,3 +95,15 @@
 19     Customer#000000019      uc,3bHIx84H,wdrmLOjVsiqXCq2tr   18      
28-396-526-5053 8914.71 HOUSEHOLD        nag. furiously careful packages are 
slyly at the accounts. furiously regular in
 20     Customer#000000020      JrPk8Pqplj4Ne   22      32-957-234-8742 7603.40 
FURNITURE       g alongside of the special excuses-- fluffily enticing packages 
wake 
 
+-- !count3 --
+19
+
+-- !count3_orc --
+19
+
+-- !max3 --
+update-comment-2
+
+-- !max3_orc --
+update-comment-2
+
diff --git 
a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
 
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
index a5a21b3da6a..984181ac2f3 100644
--- 
a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
+++ 
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
@@ -40,9 +40,17 @@ suite("iceberg_equality_delete", 
"p2,external,iceberg,external_remote,external_r
         // one delete column
         qt_one_delete_column """select * from customer_flink_one order by 
c_custkey"""
         qt_one_delete_column_orc """select * from customer_flink_one_orc order 
by c_custkey"""
+        qt_count1 """select count(*) from  customer_flink_one"""
+        qt_count1_orc """select count(*) from  customer_flink_one_orc"""
+        qt_max1 """select max(c_comment) from customer_flink_one"""
+        qt_max1_orc """select max(c_comment) from customer_flink_one_orc"""
         // three delete columns
         qt_one_delete_column """select * from customer_flink_three order by 
c_custkey"""
         qt_one_delete_column_orc """select * from customer_flink_three_orc 
order by c_custkey"""
+        qt_count3 """select count(*) from  customer_flink_three"""
+        qt_count3_orc """select count(*) from  customer_flink_three_orc"""
+        qt_max3 """select max(c_comment) from customer_flink_three"""
+        qt_max3_orc """select max(c_comment) from customer_flink_three_orc"""
 
         sql """drop catalog ${catalog_name}"""
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to