This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 938f768aba [fix](parquet) resolve offset check failed in parquet map 
type (#22510)
938f768aba is described below

commit 938f768aba5873b7e967fef9d754086cac5444a2
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Aug 2 22:33:10 2023 +0800

    [fix](parquet) resolve offset check failed in parquet map type (#22510)
    
    Fix error when reading empty map values in parquet. The `offsets.back()` 
doesn't not equal the number of elements in map's key column.
    
    ### How does this happen
    Map in parquet is stored as repeated group, and `repeated_parent_def_level` 
is set incorrectly when parsing map node in parquet schema.
    ```
    the map definition in parquet:
     optional group <name> (MAP) {
       repeated group map (MAP_KEY_VALUE) {
         required <type> key;
         optional <type> value;
       }
    }
    ```
    
    ### How to fix
    Set the `repeated_parent_def_level` of key/value node as the definition 
level of map node.
    
    `repeated_parent_def_level` is the definition level of the first ancestor 
node whose `repetition_type` equals `REPEATED`.  Empty array/map values are not 
stored in doris column, so have to use `repeated_parent_def_level` to skip the 
empty or null values in ancestor node.
    
    For instance, considering an array of strings with 3 rows like the 
following:
    `null, [], [a, b, c]`
    We can store four elements in data column: `null, a, b, c`
    and the offsets column is: `1, 1, 4`
    and the null map is: `1, 0, 0`
    For the `i-th` row in array column: range from `offsets[i - 1]` until 
`offsets[i]` represents the elements in this row, so we can't store empty 
array/map values in doris data column. As a comparison, spark does not require 
`repeated_parent_def_level`, because the spark column stores empty array/map 
values , and use anther length column to indicate empty values. Please 
reference: 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasourc
 [...]
    
    Furthermore, we can also avoid store null array/map values in doris data 
column. The same three rows as above, We can only store three elements in data 
column: `a, b, c`
    and the offsets column is: `0, 0, 3`
    and the null map is: `1, 0, 0`
---
 be/src/vec/exec/format/parquet/schema_desc.cpp     | 29 +++++++++++++++++++++-
 .../exec/format/parquet/vparquet_column_reader.cpp | 19 ++++++++------
 .../hive/test_complex_types.out                    |  6 +++++
 .../hive/test_complex_types.groovy                 |  2 ++
 4 files changed, 47 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index f52e3921ad..dcc5140e89 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -62,6 +62,32 @@ static int num_children_node(const tparquet::SchemaElement& 
schema) {
     return schema.__isset.num_children ? schema.num_children : 0;
 }
 
+/**
+ * `repeated_parent_def_level` is the definition level of the first ancestor 
node whose repetition_type equals REPEATED.
+ * Empty array/map values are not stored in doris columns, so have to use 
`repeated_parent_def_level` to skip the
+ * empty or null values in ancestor node.
+ *
+ * For instance, considering an array of strings with 3 rows like the 
following:
+ * null, [], [a, b, c]
+ * We can store four elements in data column: null, a, b, c
+ * and the offsets column is: 1, 1, 4
+ * and the null map is: 1, 0, 0
+ * For the i-th row in array column: range from `offsets[i - 1]` until 
`offsets[i]` represents the elements in this row,
+ * so we can't store empty array/map values in doris data column.
+ * As a comparison, spark does not require `repeated_parent_def_level`,
+ * because the spark column stores empty array/map values , and use anther 
length column to indicate empty values.
+ * Please reference: 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+ *
+ * Furthermore, we can also avoid store null array/map values in doris data 
column.
+ * The same three rows as above, We can only store three elements in data 
column: a, b, c
+ * and the offsets column is: 0, 0, 3
+ * and the null map is: 1, 0, 0
+ *
+ * Inherit the repetition and definition level from parent node, if the parent 
node is repeated,
+ * we should set repeated_parent_def_level = definition_level, otherwise as 
repeated_parent_def_level.
+ * @param parent parent node
+ * @param repeated_parent_def_level the first ancestor node whose 
repetition_type equals REPEATED
+ */
 static void set_child_node_level(FieldSchema* parent, int16_t 
repeated_parent_def_level) {
     for (auto& child : parent->children) {
         child.repetition_level = parent->repetition_level;
@@ -475,7 +501,8 @@ Status FieldDescriptor::parse_map_field(const 
std::vector<tparquet::SchemaElemen
     map_field->definition_level++;
 
     map_field->children.resize(1);
-    set_child_node_level(map_field, map_field->repeated_parent_def_level);
+    // map is a repeated node, we should set the `repeated_parent_def_level` 
of its children as `definition_level`
+    set_child_node_level(map_field, map_field->definition_level);
     auto map_kv_field = &map_field->children[0];
     // produce MAP<STRUCT<KEY, VALUE>>
     RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, map_kv_field));
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index de8afb0923..7499a63675 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -574,21 +574,22 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr&
         data_column = doris_column->assume_mutable();
     }
 
+    ColumnPtr& element_column = 
static_cast<ColumnArray&>(*data_column).get_data_ptr();
+    DataTypePtr& element_type = const_cast<DataTypePtr&>(
+            (reinterpret_cast<const 
DataTypeArray*>(remove_nullable(type).get()))
+                    ->get_nested_type());
     // read nested column
-    RETURN_IF_ERROR(_element_reader->read_column_data(
-            static_cast<ColumnArray&>(*data_column).get_data_ptr(),
-            const_cast<DataTypePtr&>(
-                    (reinterpret_cast<const 
DataTypeArray*>(remove_nullable(type).get()))
-                            ->get_nested_type()),
-            select_vector, batch_size, read_rows, eof, is_dict_filter));
+    RETURN_IF_ERROR(_element_reader->read_column_data(element_column, 
element_type, select_vector,
+                                                      batch_size, read_rows, 
eof, is_dict_filter));
     if (*read_rows == 0) {
         return Status::OK();
     }
 
+    ColumnArray::Offsets64& offsets_data = 
static_cast<ColumnArray&>(*data_column).get_offsets();
     // fill offset and null map
-    fill_array_offset(_field_schema, 
static_cast<ColumnArray&>(*data_column).get_offsets(),
-                      null_map_ptr, _element_reader->get_rep_level(),
+    fill_array_offset(_field_schema, offsets_data, null_map_ptr, 
_element_reader->get_rep_level(),
                       _element_reader->get_def_level());
+    DCHECK_EQ(element_column->size(), offsets_data.back());
 
     return Status::OK();
 }
@@ -650,9 +651,11 @@ Status MapColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr& t
         return Status::OK();
     }
 
+    DCHECK_EQ(key_column->size(), value_column->size());
     // fill offset and null map
     fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, 
_key_reader->get_rep_level(),
                       _key_reader->get_def_level());
+    DCHECK_EQ(key_column->size(), map.get_offsets().back());
 
     return Status::OK();
 }
diff --git 
a/regression-test/data/external_table_emr_p2/hive/test_complex_types.out 
b/regression-test/data/external_table_emr_p2/hive/test_complex_types.out
index 88d62b3841..8c4f9d04f4 100644
--- a/regression-test/data/external_table_emr_p2/hive/test_complex_types.out
+++ b/regression-test/data/external_table_emr_p2/hive/test_complex_types.out
@@ -23,3 +23,9 @@
 -- !array_last --
 0.9899828598260161
 
+-- !offsets_check --
+0      [1, 2]  [[], [3], NULL] {"a":1, "b":2}  {"e", NULL}
+1      []      []      {}      \N
+2      \N      \N      \N      {"h", 10}
+3      [5, NULL]       [[6, 7], [8, NULL], NULL]       {"f":1, "g":NULL}       
{NULL, 9}
+
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_complex_types.groovy 
b/regression-test/suites/external_table_emr_p2/hive/test_complex_types.groovy
index 2aaaa815fd..c86c8c2562 100644
--- 
a/regression-test/suites/external_table_emr_p2/hive/test_complex_types.groovy
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_complex_types.groovy
@@ -50,5 +50,7 @@ suite("test_complex_types", "p2") {
         qt_array_filter """select count(array_size(array_filter(i -> (i > 
0.99), capacity))) from byd where array_size(array_filter(i -> (i > 0.99), 
capacity))"""
 
         qt_array_last """select max(array_last(i -> i > 0, capacity)) from byd 
where array_last(i -> i > 0, capacity) < 0.99"""
+
+        qt_offsets_check """select * from complex_offsets_check order by id"""
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to