This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 473359772c229d2591b2059837e5dc31c6899fe0
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Aug 22 13:35:29 2023 +0800

    [fix](parquet) parquet reader confuses logical/physical/slot id of columns 
(#23198)
    
    `ParquetReader` confuses logical/physical/slot id of columns. If only 
reading the scalar types, there's nothing wrong, but when reading complex 
types, `RowGroup` and `PageIndex` will get wrong statistics. Therefore, if the 
query contains complex types and pushed-down predicates, the probability of the 
result set is incorrect.
---
 be/src/vec/exec/format/parquet/parquet_common.h    |  8 --
 .../exec/format/parquet/vparquet_column_reader.h   |  2 -
 .../exec/format/parquet/vparquet_group_reader.cpp  | 11 +--
 .../exec/format/parquet/vparquet_group_reader.h    |  8 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 97 +++++++++++-----------
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  6 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  4 +-
 .../hive/test_hive_text_complex_type.out           |  3 +
 .../hive/test_hive_text_complex_type.groovy        |  6 +-
 9 files changed, 68 insertions(+), 77 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index 7c35a2b111..0a4278ae67 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -46,14 +46,6 @@ struct RowRange {
     }
 };
 
-struct ParquetReadColumn {
-    ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
-            : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) 
{};
-
-    int _parquet_col_id;
-    const std::string& _file_slot_name;
-};
-
 #pragma pack(1)
 struct ParquetInt96 {
     uint64_t lo; // time of nanoseconds in a day
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 6d0e2e3f6e..df531bf29c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -132,7 +132,6 @@ public:
                          const std::vector<RowRange>& row_ranges, 
cctz::time_zone* ctz,
                          io::IOContext* io_ctx, 
std::unique_ptr<ParquetColumnReader>& reader,
                          size_t max_buf_size);
-    void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index 
= offset_index; }
     void set_nested_column() { _nested_column = true; }
     virtual const std::vector<level_t>& get_rep_level() const = 0;
     virtual const std::vector<level_t>& get_def_level() const = 0;
@@ -149,7 +148,6 @@ protected:
     const std::vector<RowRange>& _row_ranges;
     cctz::time_zone* _ctz;
     io::IOContext* _io_ctx;
-    tparquet::OffsetIndex* _offset_index;
     int64_t _current_row_index = 0;
     int _row_range_index = 0;
     int64_t _decode_null_map_time = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 3ef3581e36..d3aa4c3cad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -78,7 +78,7 @@ namespace doris::vectorized {
 const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
 
 RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
-                               const std::vector<ParquetReadColumn>& 
read_columns,
+                               const std::vector<std::string>& read_columns,
                                const int32_t row_group_id, const 
tparquet::RowGroup& row_group,
                                cctz::time_zone* ctz, io::IOContext* io_ctx,
                                const PositionDeleteContext& 
position_delete_ctx,
@@ -126,21 +126,16 @@ Status RowGroupReader::init(
     const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 
20;
     size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / 
_read_columns.size());
     for (auto& read_col : _read_columns) {
-        auto field = 
const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
+        auto field = const_cast<FieldSchema*>(schema.get_column(read_col));
         std::unique_ptr<ParquetColumnReader> reader;
         RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
_row_group_meta,
                                                     _read_ranges, _ctz, 
_io_ctx, reader,
                                                     max_buf_size));
-        auto col_iter = col_offsets.find(read_col._parquet_col_id);
-        if (col_iter != col_offsets.end()) {
-            tparquet::OffsetIndex oi = col_iter->second;
-            reader->add_offset_index(&oi);
-        }
         if (reader == nullptr) {
             VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader 
failed";
             return Status::Corruption("Init row group reader failed");
         }
-        _column_readers[read_col._file_slot_name] = std::move(reader);
+        _column_readers[read_col] = std::move(reader);
     }
     // Check if single slot can be filtered by dict.
     if (!_slot_id_to_filter_conjuncts) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index baa5912f99..c44899b583 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -141,9 +141,9 @@ public:
         PositionDeleteContext(const PositionDeleteContext& filter) = default;
     };
 
-    RowGroupReader(io::FileReaderSPtr file_reader,
-                   const std::vector<ParquetReadColumn>& read_columns, const 
int32_t row_group_id,
-                   const tparquet::RowGroup& row_group, cctz::time_zone* ctz, 
io::IOContext* io_ctx,
+    RowGroupReader(io::FileReaderSPtr file_reader, const 
std::vector<std::string>& read_columns,
+                   const int32_t row_group_id, const tparquet::RowGroup& 
row_group,
+                   cctz::time_zone* ctz, io::IOContext* io_ctx,
                    const PositionDeleteContext& position_delete_ctx,
                    const LazyReadContext& lazy_read_ctx, RuntimeState* state);
 
@@ -191,7 +191,7 @@ private:
 
     io::FileReaderSPtr _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
-    const std::vector<ParquetReadColumn>& _read_columns;
+    const std::vector<std::string>& _read_columns;
     const int32_t _row_group_id;
     const tparquet::RowGroup& _row_group_meta;
     int64_t _remaining_rows;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index ba80992a04..f8dab141b7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -322,6 +322,10 @@ Status ParquetReader::init_reader(
     // e.g. table added a column after this parquet file was written.
     _column_names = &all_column_names;
     auto schema_desc = _file_metadata->schema();
+    std::set<std::string> required_columns(all_column_names.begin(), 
all_column_names.end());
+    // Currently only used in iceberg, the columns are dropped but added back
+    std::set<std::string> dropped_columns(missing_column_names.begin(), 
missing_column_names.end());
+    // Make the order of read columns the same as physical order in parquet 
file
     for (int i = 0; i < schema_desc.size(); ++i) {
         auto name = schema_desc.get_column(i)->name;
         // If the column in parquet file is included in all_column_names and 
not in missing_column_names,
@@ -329,15 +333,17 @@ Status ParquetReader::init_reader(
         // Here to check against missing_column_names is for the 'Add a column 
back to the table
         // with the same column name' case. (drop column a then add column a).
         // Shouldn't read this column data in this case.
-        if (find(all_column_names.begin(), all_column_names.end(), name) !=
-                    all_column_names.end() &&
-            find(missing_column_names.begin(), missing_column_names.end(), 
name) ==
-                    missing_column_names.end()) {
-            _map_column.emplace(name, i);
+        if (required_columns.find(name) != required_columns.end() &&
+            dropped_columns.find(name) == dropped_columns.end()) {
+            required_columns.erase(name);
+            _read_columns.emplace_back(name);
         }
     }
+    for (const std::string& name : required_columns) {
+        _missing_cols.emplace_back(name);
+    }
+
     _colname_to_value_range = colname_to_value_range;
-    RETURN_IF_ERROR(_init_read_columns());
     // build column predicates for column lazy read
     _lazy_read_ctx.conjuncts = conjuncts;
     RETURN_IF_ERROR(_init_row_groups(filter_groups));
@@ -394,15 +400,15 @@ Status ParquetReader::set_fill_columns(
 
     const FieldDescriptor& schema = _file_metadata->schema();
     for (auto& read_col : _read_columns) {
-        _lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name);
-        PrimitiveType column_type = 
schema.get_column(read_col._file_slot_name)->type.type;
+        _lazy_read_ctx.all_read_columns.emplace_back(read_col);
+        PrimitiveType column_type = schema.get_column(read_col)->type.type;
         if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || 
column_type == TYPE_STRUCT) {
             _has_complex_type = true;
         }
         if (predicate_columns.size() > 0) {
-            auto iter = predicate_columns.find(read_col._file_slot_name);
+            auto iter = predicate_columns.find(read_col);
             if (iter == predicate_columns.end()) {
-                
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name);
+                _lazy_read_ctx.lazy_read_columns.emplace_back(read_col);
             } else {
                 
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
                 
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
@@ -450,29 +456,6 @@ Status ParquetReader::set_fill_columns(
     return Status::OK();
 }
 
-Status ParquetReader::_init_read_columns() {
-    std::vector<int> include_column_ids;
-    for (auto& file_col_name : *_column_names) {
-        auto iter = _map_column.find(file_col_name);
-        if (iter != _map_column.end()) {
-            include_column_ids.emplace_back(iter->second);
-        } else {
-            _missing_cols.push_back(file_col_name);
-        }
-    }
-    // It is legal to get empty include_column_ids in query task.
-    if (include_column_ids.empty()) {
-        return Status::OK();
-    }
-    // The same order as physical columns
-    std::sort(include_column_ids.begin(), include_column_ids.end());
-    for (int& parquet_col_id : include_column_ids) {
-        _read_columns.emplace_back(parquet_col_id,
-                                   
_file_metadata->schema().get_column(parquet_col_id)->name);
-    }
-    return Status::OK();
-}
-
 std::unordered_map<std::string, TypeDescriptor> 
ParquetReader::get_name_to_type() {
     std::unordered_map<std::string, TypeDescriptor> map;
     const auto& schema_desc = _file_metadata->schema();
@@ -643,11 +626,24 @@ Status ParquetReader::_init_row_groups(const bool& 
is_filter_groups) {
             RETURN_IF_ERROR(_process_row_group_filter(row_group, 
&filter_group));
         }
         int64_t group_size = 0; // only calculate the needed columns
-        for (auto& read_col : _read_columns) {
-            auto& parquet_col_id = read_col._parquet_col_id;
-            if (row_group.columns[parquet_col_id].__isset.meta_data) {
-                group_size += 
row_group.columns[parquet_col_id].meta_data.total_compressed_size;
+        std::function<int64_t(const FieldSchema*)> column_compressed_size =
+                [&row_group, &column_compressed_size](const FieldSchema* 
field) -> int64_t {
+            if (field->physical_column_index >= 0) {
+                int parquet_col_id = field->physical_column_index;
+                if (row_group.columns[parquet_col_id].__isset.meta_data) {
+                    return 
row_group.columns[parquet_col_id].meta_data.total_compressed_size;
+                }
+                return 0;
+            }
+            int64_t size = 0;
+            for (const FieldSchema& child : field->children) {
+                size += column_compressed_size(&child);
             }
+            return size;
+        };
+        for (auto& read_col : _read_columns) {
+            const FieldSchema* field = 
_file_metadata->schema().get_column(read_col);
+            group_size += column_compressed_size(field);
         }
         if (!filter_group) {
             _read_row_groups.emplace_back(row_group_idx, row_index, row_index 
+ row_group.num_rows);
@@ -703,7 +699,7 @@ std::vector<io::PrefetchRange> 
ParquetReader::_generate_random_access_ranges(
             };
     const tparquet::RowGroup& row_group = 
_t_metadata->row_groups[group.row_group_id];
     for (const auto& read_col : _read_columns) {
-        const FieldSchema* field = 
_file_metadata->schema().get_column(read_col._file_slot_name);
+        const FieldSchema* field = 
_file_metadata->schema().get_column(read_col);
         scalar_range(field, row_group);
     }
     if (!result.empty()) {
@@ -766,11 +762,16 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     // read twice: parse column index & parse offset index
     _column_statistics.meta_read_calls += 2;
     for (auto& read_col : _read_columns) {
-        auto conjunct_iter = 
_colname_to_value_range->find(read_col._file_slot_name);
+        auto conjunct_iter = _colname_to_value_range->find(read_col);
         if (_colname_to_value_range->end() == conjunct_iter) {
             continue;
         }
-        auto& chunk = row_group.columns[read_col._parquet_col_id];
+        int parquet_col_id = 
_file_metadata->schema().get_column(read_col)->physical_column_index;
+        if (parquet_col_id < 0) {
+            // complex type, not support page index yet.
+            continue;
+        }
+        auto& chunk = row_group.columns[parquet_col_id];
         if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
             continue;
         }
@@ -782,7 +783,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         }
         auto& conjuncts = conjunct_iter->second;
         std::vector<int> skipped_page_range;
-        const FieldSchema* col_schema = 
schema_desc.get_column(read_col._file_slot_name);
+        const FieldSchema* col_schema = schema_desc.get_column(read_col);
         page_index.collect_skipped_page_range(&column_index, conjuncts, 
col_schema,
                                               skipped_page_range, *_ctz);
         if (skipped_page_range.empty()) {
@@ -797,7 +798,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
             // use the union row range
             skipped_row_ranges.emplace_back(skipped_row_range);
         }
-        _col_offsets.emplace(read_col._parquet_col_id, offset_index);
+        _col_offsets.emplace(parquet_col_id, offset_index);
     }
     if (skipped_row_ranges.empty()) {
         read_whole_row_group();
@@ -849,16 +850,16 @@ Status ParquetReader::_process_column_stat_filter(const 
std::vector<tparquet::Co
         return Status::OK();
     }
     auto& schema_desc = _file_metadata->schema();
-    for (auto& col_name : *_column_names) {
-        auto col_iter = _map_column.find(col_name);
-        if (col_iter == _map_column.end()) {
-            continue;
-        }
+    for (auto& col_name : _read_columns) {
         auto slot_iter = _colname_to_value_range->find(col_name);
         if (slot_iter == _colname_to_value_range->end()) {
             continue;
         }
-        int parquet_col_id = col_iter->second;
+        int parquet_col_id = 
_file_metadata->schema().get_column(col_name)->physical_column_index;
+        if (parquet_col_id < 0) {
+            // complex type, not support filter yet.
+            continue;
+        }
         auto& meta_data = columns[parquet_col_id].meta_data;
         auto& statistic = meta_data.statistics;
         bool is_all_null =
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 0f3996db40..b2f8a2bd19 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -187,7 +187,6 @@ private:
     RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
             const tparquet::RowGroup& row_group,
             const RowGroupReader::RowGroupIndex& row_group_index);
-    Status _init_read_columns();
     Status _init_row_groups(const bool& is_filter_groups);
     void _init_system_properties();
     void _init_file_description();
@@ -226,12 +225,11 @@ private:
     std::unique_ptr<RowGroupReader> _current_group_reader = nullptr;
     // read to the end of current reader
     bool _row_group_eof = true;
-    int32_t _total_groups;                  // num of groups(stripes) of a 
parquet(orc) file
-    std::map<std::string, int> _map_column; // column-name <---> column-index
+    int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
     // table column name to file column name map. For iceberg schema evolution.
     std::unordered_map<std::string, std::string> _table_col_to_file_col;
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    std::vector<ParquetReadColumn> _read_columns;
+    std::vector<std::string> _read_columns;
     RowRange _whole_range = RowRange(0, 0);
     const std::vector<int64_t>* _delete_rows = nullptr;
     int64_t _delete_rows_index = 0;
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index bb77b32e58..08879e604a 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -499,11 +499,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     SlotDescriptor string_slot(tslot_desc);
     tuple_slots.emplace_back(&string_slot);
 
-    std::vector<ParquetReadColumn> read_columns;
+    std::vector<std::string> read_columns;
     RowGroupReader::LazyReadContext lazy_read_ctx;
     for (const auto& slot : tuple_slots) {
         lazy_read_ctx.all_read_columns.emplace_back(slot->col_name());
-        read_columns.emplace_back(ParquetReadColumn(7, slot->col_name()));
+        read_columns.emplace_back(slot->col_name());
     }
     io::FileSystemSPtr local_fs = io::LocalFileSystem::create("");
     io::FileReaderSPtr file_reader;
diff --git 
a/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out 
b/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out
index a04b9c1def..d1113f5228 100644
--- 
a/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out
+++ 
b/regression-test/data/external_table_p2/hive/test_hive_text_complex_type.out
@@ -13,3 +13,6 @@
 10     {101:1, 102:1, 103:1}   {102:10, 104:1, 105:2}  {"field1":100, 
"field0":100}    {"field2":3000000}      {"field3":300000000}    
{"field4":3.14, "hello world":0.111, "hell0":7.001}     {"field5":3.14159}      
{103:"Hello"}   {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 
12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000}      
{"field7":2023-07-28}   {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, 
"Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
 11     {101:1, 102:1, 13:1, 12:1}      {102:10, 14:1, 15:2, 12:10}     
{"field1":100, "fie88ld0":100, "fieweld0":100, "fieeeld1":100, "fieeeld0":100, 
"feeield0":100, "feeield1":100, "firreld0":100, "field0":100}    
{"field2":3000000, "abcd":4000000, "1231":3000000}      {"fi7eld3":300000000, 
"field30":300000000, "fielwwd3":300000000, "fi055":300000000, 
"field7":300000121323}      {"field4":3.14, "hello world":0.111, "hell0":7.001} 
    {"field5":3.14159}      {103:"Hello", 0:"hello"}        
{"field6":2023-07-28 12:34:56.000000, " [...]
 
+-- !filter_complex --
+50000  50000   50000
+
diff --git 
a/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy
 
b/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy
index 8ea9f74135..2b0098081c 100644
--- 
a/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy
+++ 
b/regression-test/suites/external_table_p2/hive/test_hive_text_complex_type.groovy
@@ -40,7 +40,11 @@ suite("test_hive_text_complex_type", 
"p2,external,hive,external_remote,external_
 
         qt_sql2 """ select * from hive_text_complex_type_delimiter order by 
column1; """   
 
-
+        qt_filter_complex """select count(column_primitive_integer),
+            count(column1_struct),
+            count(column_primitive_bigint)
+            from parquet_predicate_table where column_primitive_bigint = 6;"""
+        sql """drop catalog ${catalog_name};"""
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to