This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25de068a05 [fix](parquet-reader) the value of null map will overflow
when LazyRead merges too many empty batches (#14558)
25de068a05 is described below
commit 25de068a050ce5bff08467464a6e810efd202315
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Nov 25 12:22:18 2022 +0800
[fix](parquet-reader) the value of null map will overflow when LazyRead
merges too many empty batches (#14558)
The run length of null map is saved as `uint16_t`. Previously, the run
length of null map was
limited by `batch_size` in the `ParquetReader`, by setting `batch_size =
std::min(batch_size, (size_t)USHRT_MAX)`.
It works well when the batch size is less than `USHRT_MAX`.
However, [Lazy read](https://github.com/apache/doris/pull/13917) will merge
empty batches until reading
a non-empty batch or reaching the EOF of a row group, so the `batch_size`
may be greater than `USHRT_MAX`
in non-predicate columns.
In addition, even if the `batch_size` does not exceed `USHRT_MAX`, the
adjacent batches may also make
the run length exceed the `USHRT_MAX` in
`ColumnSelectVector::get_next_run`.
---
be/src/vec/exec/format/parquet/parquet_common.cpp | 24 ++++++++---------
be/src/vec/exec/format/parquet/parquet_common.h | 30 +++++++++++-----------
.../exec/format/parquet/vparquet_column_reader.cpp | 16 ++++++++++--
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 3 +--
4 files changed, 42 insertions(+), 31 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 1f0d8327db..98f0a7de1a 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -90,17 +90,17 @@ void ColumnSelectVector::set_run_length_null_map(const
std::vector<uint16_t>& ru
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
- for (uint16_t i = 0; i < run_length; ++i) {
+ for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
- for (uint16_t i = 0; i < run_length; ++i) {
+ for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}
- uint16_t num_read = 0;
+ size_t num_read = 0;
DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
for (size_t i = 0; i < num_values; ++i) {
if (_filter_map[_filter_map_index++]) {
@@ -174,13 +174,13 @@ void ColumnSelectVector::skip(size_t num_values) {
_filter_map_index += num_values;
}
-uint16_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) {
+size_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) {
if (_has_filter) {
if (_read_index == _num_values) {
return 0;
}
const DataReadType& type = _data_map[_read_index++];
- uint16_t run_length = 1;
+ size_t run_length = 1;
while (_read_index < _num_values) {
if (_data_map[_read_index] == type) {
run_length++;
@@ -192,7 +192,7 @@ uint16_t ColumnSelectVector::get_next_run(DataReadType*
data_read_type) {
*data_read_type = type;
return run_length;
} else {
- uint16_t run_length = 0;
+ size_t run_length = 0;
while (run_length == 0) {
if (_read_index == (*_run_length_null_map).size()) {
return 0;
@@ -415,12 +415,12 @@ Status FixLengthDecoder::_decode_string(MutableColumnPtr&
doris_column,
ColumnSelectVector& select_vector) {
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
string_values.emplace_back(buf_start, _type_length);
_FIXED_SHIFT_DATA_OFFSET();
@@ -517,12 +517,12 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypeP
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
if (_has_dict) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
} else {
@@ -622,11 +622,11 @@ Status BoolPlainDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypeP
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
bool value;
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in
plain decoder");
}
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h
b/be/src/vec/exec/format/parquet/parquet_common.h
index 1b64c129f5..d124c2fb9a 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -113,7 +113,7 @@ public:
}
}
- uint16_t get_next_run(DataReadType* data_read_type);
+ size_t get_next_run(DataReadType* data_read_type);
void set_run_length_null_map(const std::vector<uint16_t>&
run_length_null_map,
size_t num_values, NullMap* null_map =
nullptr);
@@ -261,10 +261,10 @@ Status
FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column,
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
column_data[data_index++] = *(Numeric*)buf_start;
_FIXED_SHIFT_DATA_OFFSET();
@@ -300,10 +300,10 @@ Status FixLengthDecoder::_decode_date(MutableColumnPtr&
doris_column,
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
int64_t date_value =
static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
auto& v =
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -346,10 +346,10 @@ Status
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
size_t dict_index = 0;
int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
auto& v =
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -392,10 +392,10 @@ Status
FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
column_data.resize(data_index + select_vector.num_values() -
select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
ParquetInt96& datetime96 =
*reinterpret_cast<ParquetInt96*>(buf_start);
auto& v =
reinterpret_cast<CppType&>(column_data[data_index++]);
@@ -444,10 +444,10 @@ Status
FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
// When Decimal in parquet is stored in byte arrays, binary
and fixed,
// the unscaled number must be encoded as two's complement
using big-endian byte order.
@@ -500,10 +500,10 @@ Status
FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_colum
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
// we should use decimal128 to scale up/down
Int128 value =
*reinterpret_cast<DecimalPhysicalType*>(buf_start);
@@ -578,10 +578,10 @@ Status
ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
- while (uint16_t run_length = select_vector.get_next_run(&read_type)) {
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- for (int i = 0; i < run_length; ++i) {
+ for (size_t i = 0; i < run_length; ++i) {
char* buf_start;
uint32_t length;
if (_has_dict) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 5ac70e1bff..235a5f3e3b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -154,7 +154,13 @@ Status ScalarColumnReader::_read_values(size_t num_values,
ColumnPtr& doris_colu
if (!(prev_is_null ^ is_null)) {
null_map.emplace_back(0);
}
- null_map.emplace_back((uint16_t)loop_read);
+ size_t remaining = loop_read;
+ while (remaining > USHRT_MAX) {
+ null_map.emplace_back(USHRT_MAX);
+ null_map.emplace_back(0);
+ remaining -= USHRT_MAX;
+ }
+ null_map.emplace_back((u_short)remaining);
prev_is_null = is_null;
has_read += loop_read;
}
@@ -166,7 +172,13 @@ Status ScalarColumnReader::_read_values(size_t num_values,
ColumnPtr& doris_colu
data_column = doris_column->assume_mutable();
}
if (null_map.size() == 0) {
- null_map.emplace_back((uint16_t)num_values);
+ size_t remaining = num_values;
+ while (remaining > USHRT_MAX) {
+ null_map.emplace_back(USHRT_MAX);
+ null_map.emplace_back(0);
+ remaining -= USHRT_MAX;
+ }
+ null_map.emplace_back((u_short)remaining);
}
{
SCOPED_RAW_TIMER(&_decode_null_map_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a525b531a6..cb22f9d36e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -34,11 +34,10 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams
: _profile(profile),
_scan_params(params),
_scan_range(range),
+ _batch_size(batch_size),
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz) {
- // ColumnSelectVector use uint16_t to save row index
- _batch_size = std::min(batch_size, (size_t)USHRT_MAX);
_init_profile();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]