This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3dd909f7511c8186f7319355ce976c7979f02964
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Aug 22 14:47:10 2023 +0800

    [fix](parquet) A row of complex type may be stored across more pages 
(#23277)
    
    A row of complex type may be stored across two(or more) pages, and the 
parameter `align_rows` indicates that whether the reader should read the 
remaining value of the last row in previous page.
---
 .../exec/format/parquet/vparquet_column_reader.cpp | 63 +++++++++++++++-------
 .../exec/format/parquet/vparquet_column_reader.h   |  2 +-
 .../data/external_table_p2/tvf/test_tvf_p2.out     |  3 ++
 .../external_table_p2/tvf/test_tvf_p2.groovy       |  7 +++
 4 files changed, 55 insertions(+), 20 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 0a688f6ed3..1da9a4f3c8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -303,11 +303,25 @@ Status ScalarColumnReader::_read_values(size_t 
num_values, ColumnPtr& doris_colu
     return _chunk_reader->decode_values(data_column, type, select_vector, 
is_dict_filter);
 }
 
+/**
+ * Load the nested column data of complex type.
+ * A row of complex type may be stored across two(or more) pages, and the 
parameter `align_rows` indicates that
+ * whether the reader should read the remaining value of the last row in 
previous page.
+ */
 Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, 
DataTypePtr& type,
                                                ColumnSelectVector& 
select_vector, size_t batch_size,
-                                               size_t* read_rows, bool* eof, 
bool is_dict_filter) {
-    _rep_levels.resize(0);
-    _def_levels.resize(0);
+                                               size_t* read_rows, bool* eof, 
bool is_dict_filter,
+                                               bool align_rows = false) {
+    size_t origin_size = 0;
+    if (align_rows) {
+        origin_size = _rep_levels.size();
+        // just read the remaining values of the last row in previous page,
+        // so there's no a new row should be read.
+        batch_size = 0;
+    } else {
+        _rep_levels.resize(0);
+        _def_levels.resize(0);
+    }
     size_t parsed_rows = 0;
     size_t remaining_values = _chunk_reader->remaining_num_values();
     bool has_rep_level = _chunk_reader->max_rep_level() > 0;
@@ -327,22 +341,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
             _rep_levels.emplace_back(rep_level);
             remaining_values--;
         }
-    } else {
+    } else if (!align_rows) {
+        // case : required child columns in struct type
         parsed_rows = std::min(remaining_values, batch_size);
         remaining_values -= parsed_rows;
-        _rep_levels.resize(parsed_rows);
-        for (size_t i = 0; i < parsed_rows; ++i) {
-            _rep_levels[i] = 0;
-        }
+        _rep_levels.resize(parsed_rows, 0);
     }
     size_t parsed_values = _chunk_reader->remaining_num_values() - 
remaining_values;
-    _def_levels.resize(parsed_values);
+    _def_levels.resize(origin_size + parsed_values);
     if (has_def_level) {
-        _chunk_reader->def_level_decoder().get_levels(&_def_levels[0], 
parsed_values);
+        
_chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], 
parsed_values);
     } else {
-        for (size_t i = 0; i < parsed_values; ++i) {
-            _def_levels[i] = 0;
-        }
+        std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
     }
 
     MutableColumnPtr data_column;
@@ -360,14 +370,14 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
         }
         data_column = doris_column->assume_mutable();
     }
-    size_t has_read = 0;
+    size_t has_read = origin_size;
     size_t ancestor_nulls = 0;
     null_map.emplace_back(0);
     bool prev_is_null = false;
-    while (has_read < parsed_values) {
+    while (has_read < origin_size + parsed_values) {
         level_t def_level = _def_levels[has_read++];
         size_t loop_read = 1;
-        while (has_read < parsed_values && _def_levels[has_read] == def_level) 
{
+        while (has_read < origin_size + parsed_values && _def_levels[has_read] 
== def_level) {
             has_read++;
             loop_read++;
         }
@@ -407,9 +417,24 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& 
doris_column, DataType
         _chunk_reader->skip_values(ancestor_nulls, false);
     }
 
-    *read_rows = parsed_rows;
-    if (_chunk_reader->remaining_num_values() == 0 && 
!_chunk_reader->has_next_page()) {
-        *eof = true;
+    if (!align_rows) {
+        *read_rows = parsed_rows;
+    }
+    if (_chunk_reader->remaining_num_values() == 0) {
+        if (_chunk_reader->has_next_page()) {
+            RETURN_IF_ERROR(_chunk_reader->next_page());
+            RETURN_IF_ERROR(_chunk_reader->load_page_data());
+            select_vector.reset();
+            return _read_nested_column(doris_column, type, select_vector, 0, 
read_rows, eof,
+                                       is_dict_filter, true);
+        } else {
+            *eof = true;
+        }
+    }
+    if (_rep_levels.size() > 0) {
+        // make sure the rows of complex type are aligned correctly,
+        // so the repetition level of first element should be 0, meaning a new 
row is started.
+        DCHECK_EQ(_rep_levels[0], 0);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index df531bf29c..d83e3ef9b5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -189,7 +189,7 @@ private:
                         ColumnSelectVector& select_vector, bool 
is_dict_filter);
     Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
                                ColumnSelectVector& select_vector, size_t 
batch_size,
-                               size_t* read_rows, bool* eof, bool 
is_dict_filter);
+                               size_t* read_rows, bool* eof, bool 
is_dict_filter, bool align_rows);
     Status _try_load_dict_page(bool* loaded, bool* has_dict);
 };
 
diff --git a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out 
b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out
index 3e34f50764..cb7239f2ff 100644
--- a/regression-test/data/external_table_p2/tvf/test_tvf_p2.out
+++ b/regression-test/data/external_table_p2/tvf/test_tvf_p2.out
@@ -39,3 +39,6 @@
 -- !nested_types_orc --
 20926  20928   20978   23258   20962   23258   23258
 
+-- !row_cross_pages --
+25001  25001   25001
+
diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy 
b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
index 030e66b372..d9ff3f6044 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
@@ -46,5 +46,12 @@ suite("test_tvf_p2", "p2") {
             "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc",
             "format" = "orc",
             "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+
+        // a row of complex type may be stored across more pages
+        qt_row_cross_pages """select count(id), count(m1), count(m2)
+            from hdfs(
+            "uri" = 
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
+            "format" = "parquet",
+            "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to