This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 25de068a05 [fix](parquet-reader) the value of null map will overflow 
when LazyRead merges too many empty batches (#14558)
25de068a05 is described below

commit 25de068a050ce5bff08467464a6e810efd202315
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Nov 25 12:22:18 2022 +0800

    [fix](parquet-reader) the value of null map will overflow when LazyRead 
merges too many empty batches (#14558)
    
    The run length of null map is saved as `uint16_t`. Previously, the run 
length of null map was
    limited by `batch_size` in the `ParquetReader`, by setting `batch_size = 
std::min(batch_size, (size_t)USHRT_MAX)`.
    It works well when the batch size is less than `USHRT_MAX`.
    However, [Lazy read](https://github.com/apache/doris/pull/13917) will merge 
empty batches until reading
    a non-empty batch or reaching the EOF of a row group, so the `batch_size` 
may be greater than `USHRT_MAX`
    in non-predicate columns.
    In addition, even if the `batch_size` does not exceed `USHRT_MAX`, the 
adjacent batches may also make
    the run  length exceed the `USHRT_MAX` in 
`ColumnSelectVector::get_next_run`.
---
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 24 ++++++++---------
 be/src/vec/exec/format/parquet/parquet_common.h    | 30 +++++++++++-----------
 .../exec/format/parquet/vparquet_column_reader.cpp | 16 ++++++++++--
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  3 +--
 4 files changed, 42 insertions(+), 31 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 1f0d8327db..98f0a7de1a 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -90,17 +90,17 @@ void ColumnSelectVector::set_run_length_null_map(const 
std::vector<uint16_t>& ru
         for (auto& run_length : run_length_null_map) {
             if (is_null) {
                 _num_nulls += run_length;
-                for (uint16_t i = 0; i < run_length; ++i) {
+                for (int i = 0; i < run_length; ++i) {
                     _data_map[map_index++] = FILTERED_NULL;
                 }
             } else {
-                for (uint16_t i = 0; i < run_length; ++i) {
+                for (int i = 0; i < run_length; ++i) {
                     _data_map[map_index++] = FILTERED_CONTENT;
                 }
             }
             is_null = !is_null;
         }
-        uint16_t num_read = 0;
+        size_t num_read = 0;
         DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
         for (size_t i = 0; i < num_values; ++i) {
             if (_filter_map[_filter_map_index++]) {
@@ -174,13 +174,13 @@ void ColumnSelectVector::skip(size_t num_values) {
     _filter_map_index += num_values;
 }
 
-uint16_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) {
+size_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) {
     if (_has_filter) {
         if (_read_index == _num_values) {
             return 0;
         }
         const DataReadType& type = _data_map[_read_index++];
-        uint16_t run_length = 1;
+        size_t run_length = 1;
         while (_read_index < _num_values) {
             if (_data_map[_read_index] == type) {
                 run_length++;
@@ -192,7 +192,7 @@ uint16_t ColumnSelectVector::get_next_run(DataReadType* 
data_read_type) {
         *data_read_type = type;
         return run_length;
     } else {
-        uint16_t run_length = 0;
+        size_t run_length = 0;
         while (run_length == 0) {
             if (_read_index == (*_run_length_null_map).size()) {
                 return 0;
@@ -415,12 +415,12 @@ Status FixLengthDecoder::_decode_string(MutableColumnPtr& 
doris_column,
                                         ColumnSelectVector& select_vector) {
     size_t dict_index = 0;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
             std::vector<StringRef> string_values;
             string_values.reserve(run_length);
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 string_values.emplace_back(buf_start, _type_length);
                 _FIXED_SHIFT_DATA_OFFSET();
@@ -517,12 +517,12 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypeP
         size_t dict_index = 0;
 
         ColumnSelectVector::DataReadType read_type;
-        while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+        while (size_t run_length = select_vector.get_next_run(&read_type)) {
             switch (read_type) {
             case ColumnSelectVector::CONTENT: {
                 std::vector<StringRef> string_values;
                 string_values.reserve(run_length);
-                for (int i = 0; i < run_length; ++i) {
+                for (size_t i = 0; i < run_length; ++i) {
                     if (_has_dict) {
                         
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
                     } else {
@@ -622,11 +622,11 @@ Status BoolPlainDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypeP
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
 
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
             bool value;
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 if (UNLIKELY(!_decode_value(&value))) {
                     return Status::IOError("Can't read enough booleans in 
plain decoder");
                 }
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index 1b64c129f5..d124c2fb9a 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -113,7 +113,7 @@ public:
         }
     }
 
-    uint16_t get_next_run(DataReadType* data_read_type);
+    size_t get_next_run(DataReadType* data_read_type);
 
     void set_run_length_null_map(const std::vector<uint16_t>& 
run_length_null_map,
                                  size_t num_values, NullMap* null_map = 
nullptr);
@@ -261,10 +261,10 @@ Status 
FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column,
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
     size_t dict_index = 0;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 column_data[data_index++] = *(Numeric*)buf_start;
                 _FIXED_SHIFT_DATA_OFFSET();
@@ -300,10 +300,10 @@ Status FixLengthDecoder::_decode_date(MutableColumnPtr& 
doris_column,
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
     size_t dict_index = 0;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 int64_t date_value = 
static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
                 auto& v = 
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -346,10 +346,10 @@ Status 
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
     size_t dict_index = 0;
     int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
                 auto& v = 
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -392,10 +392,10 @@ Status 
FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
     size_t dict_index = 0;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 ParquetInt96& datetime96 = 
*reinterpret_cast<ParquetInt96*>(buf_start);
                 auto& v = 
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -444,10 +444,10 @@ Status 
FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
 
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 // When Decimal in parquet is stored in byte arrays, binary 
and fixed,
                 // the unscaled number must be encoded as two's complement 
using big-endian byte order.
@@ -500,10 +500,10 @@ Status 
FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_colum
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
 
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
                 // we should use decimal128 to scale up/down
                 Int128 value = 
*reinterpret_cast<DecimalPhysicalType*>(buf_start);
@@ -578,10 +578,10 @@ Status 
ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
     size_t dict_index = 0;
     DecimalScaleParams& scale_params = _decode_params->decimal_scale;
     ColumnSelectVector::DataReadType read_type;
-    while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (int i = 0; i < run_length; ++i) {
+            for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start;
                 uint32_t length;
                 if (_has_dict) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 5ac70e1bff..235a5f3e3b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -154,7 +154,13 @@ Status ScalarColumnReader::_read_values(size_t num_values, 
ColumnPtr& doris_colu
                 if (!(prev_is_null ^ is_null)) {
                     null_map.emplace_back(0);
                 }
-                null_map.emplace_back((uint16_t)loop_read);
+                size_t remaining = loop_read;
+                while (remaining > USHRT_MAX) {
+                    null_map.emplace_back(USHRT_MAX);
+                    null_map.emplace_back(0);
+                    remaining -= USHRT_MAX;
+                }
+                null_map.emplace_back((u_short)remaining);
                 prev_is_null = is_null;
                 has_read += loop_read;
             }
@@ -166,7 +172,13 @@ Status ScalarColumnReader::_read_values(size_t num_values, 
ColumnPtr& doris_colu
         data_column = doris_column->assume_mutable();
     }
     if (null_map.size() == 0) {
-        null_map.emplace_back((uint16_t)num_values);
+        size_t remaining = num_values;
+        while (remaining > USHRT_MAX) {
+            null_map.emplace_back(USHRT_MAX);
+            null_map.emplace_back(0);
+            remaining -= USHRT_MAX;
+        }
+        null_map.emplace_back((u_short)remaining);
     }
     {
         SCOPED_RAW_TIMER(&_decode_null_map_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a525b531a6..cb22f9d36e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -34,11 +34,10 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams
         : _profile(profile),
           _scan_params(params),
           _scan_range(range),
+          _batch_size(batch_size),
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz) {
-    // ColumnSelectVector use uint16_t to save row index
-    _batch_size = std::min(batch_size, (size_t)USHRT_MAX);
     _init_profile();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to