This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpch500 by this push:
new 7c2a912ce34 [Fix](parquet-reader) Fix partiton column issue for
#29181O(Opt late materialization of parquet reader). (#29271)
7c2a912ce34 is described below
commit 7c2a912ce343f0d96587dc3805aa245f098b2e07
Author: Qi Chen <[email protected]>
AuthorDate: Fri Dec 29 16:25:31 2023 +0800
[Fix](parquet-reader) Fix partiton column issue for #29181O(Opt late
materialization of parquet reader). (#29271)
---
be/src/vec/columns/column_nullable.cpp | 13 -------
.../format/parquet/fix_length_plain_decoder.cpp | 5 ---
be/src/vec/exec/format/parquet/parquet_common.cpp | 9 -----
.../exec/format/parquet/vparquet_column_reader.cpp | 40 ----------------------
.../exec/format/parquet/vparquet_group_reader.cpp | 25 +++-----------
5 files changed, 4 insertions(+), 88 deletions(-)
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index b44482781e8..426de2d4f70 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -370,19 +370,6 @@ size_t ColumnNullable::filter(const Filter& filter) {
return data_result_size;
}
-//size_t ColumnNullable::filter(const Filter& filter) {
-// const auto data_result_size = get_nested_column().filter(filter);
-//
-// get_null_map_column().resize(data_result_size);
-// /*if (!_has_null) {
-// get_null_map_column().resize(data_result_size);
-// } else {
-// const auto map_result_size = get_null_map_column().filter(filter);
-// CHECK_EQ(data_result_size, map_result_size);
-// }*/
-// return data_result_size;
-//}
-
Status ColumnNullable::filter_by_selector(const uint16_t* sel, size_t
sel_size, IColumn* col_ptr) {
const auto* nullable_col_ptr = reinterpret_cast<const
ColumnNullable*>(col_ptr);
ColumnPtr nest_col_ptr = nullable_col_ptr->nested_column;
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
index 3160f5f5f1d..ea1f63a3e2d 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
@@ -233,11 +233,6 @@ Status
FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
while (size_t run_length =
select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
- /*for (size_t i = 0; i < run_length; ++i) {
- char* buf_start = _data->data + _offset;
- column_data[data_index++] = *(PhysicalType*)buf_start;
- _offset += _type_length;
- }*/
memcpy(column_data.data() + data_index, _data->data + _offset,
run_length * _type_length);
data_index += run_length;
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 241135eef61..095b34eeb4e 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -75,25 +75,16 @@ void ColumnSelectVector::set_run_length_null_map(const
std::vector<uint16_t>& ru
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
- //_data_map[map_index++] = FILTERED_NULL;
_data_map[map_index++] = NULL_DATA;
}
} else {
for (int i = 0; i < run_length; ++i) {
- //_data_map[map_index++] = FILTERED_CONTENT;
_data_map[map_index++] = CONTENT;
}
}
is_null = !is_null;
}
size_t num_read = 0;
- /*DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
- for (size_t i = 0; i < num_values; ++i) {
- if (_filter_map[_filter_map_index++]) {
- _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA :
CONTENT;
- num_read++;
- }
- }*/
num_read = num_values;
_num_filtered = num_values - num_read;
if (null_map != nullptr && num_read > 0) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f0120f49701..4fe3ae7b597 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -477,7 +477,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
ColumnSelectVector& select_vector,
size_t batch_size,
size_t* read_rows, bool* eof, bool
is_dict_filter,
size_t skip_nums, size_t*
skipped_nums) {
- //fprintf(stderr, "batch_size: %ld, skip_nums: %ld\n", batch_size,
skip_nums);
*skipped_nums = 0;
if (_chunk_reader->remaining_num_values() == 0) {
if (!_chunk_reader->has_next_page()) {
@@ -503,35 +502,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
RETURN_IF_ERROR(_chunk_reader->skip_page());
*read_rows = 0;
} else {
- //bool skip_whole_batch = false;
- // Determining whether to skip page or batch will increase the
calculation time.
- // When the filtering effect is greater than 60%, it is possible to
skip the page or batch.
- /*if (select_vector.has_filter() && select_vector.filter_ratio() >
0.6) {
- // lazy read
- size_t remaining_num_values = 0;
- for (auto& range : read_ranges) {
- remaining_num_values += range.last_row - range.first_row;
- }
- if (batch_size >= remaining_num_values &&
- select_vector.can_filter_all(remaining_num_values)) {
- // We can skip the whole page if the remaining values is
filtered by predicate columns
- select_vector.skip(remaining_num_values);
- fprintf(stderr, "select_vector.skip1(%ld)\n",
remaining_num_values);
- _current_row_index += _chunk_reader->remaining_num_values();
- RETURN_IF_ERROR(_chunk_reader->skip_page());
- *read_rows = remaining_num_values;
- if (!_chunk_reader->has_next_page()) {
- *eof = true;
- }
- return Status::OK();
- }
- skip_whole_batch =
- batch_size <= remaining_num_values &&
select_vector.can_filter_all(batch_size);
- if (skip_whole_batch) {
- select_vector.skip(batch_size);
- fprintf(stderr, "select_vector.skip2(%ld)\n", batch_size);
- }
- }*/
// load page data to decode or skip values
RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
@@ -557,15 +527,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
// generate the read values
size_t read_values =
std::min((size_t)(range.last_row - _current_row_index),
batch_size - has_read);
- /*if (skip_whole_batch) {
- RETURN_IF_ERROR(_skip_values(read_values));
- fprintf(stderr, "_skip_values(%ld)\n", read_values);
- } else {
- RETURN_IF_ERROR(_read_values(read_values, doris_column, type,
select_vector,
- is_dict_filter));
- fprintf(stderr, "_read_values(%ld)\n", read_values);
- }*/
- //fprintf(stderr, "read_values: %ld\n", read_values);
RETURN_IF_ERROR(
_read_values(read_values, doris_column, type,
select_vector, is_dict_filter));
has_read += read_values;
@@ -581,7 +542,6 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
if (_chunk_reader->remaining_num_values() == 0 &&
!_chunk_reader->has_next_page()) {
*eof = true;
}
- //fprintf(stderr, "*read_rows: %ld\n", *read_rows);
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0c05dcf8f1c..5757de40294 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -412,7 +412,6 @@ Status RowGroupReader::_read_column_data(Block* block,
const std::vector<std::st
RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
column_ptr, column_type, select_vector, batch_size -
col_read_rows, &loop_rows,
&col_eof, is_dict_filter, col_skip_nums, &skipped_nums));
- //fprintf(stderr, "batch_size: %ld, col_read_rows: %ld, loop_rows:
%ld\n", batch_size, col_read_rows, loop_rows);
col_skip_nums -= skipped_nums;
col_read_rows += loop_rows;
}
@@ -446,12 +445,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
pre_read_rows = 0;
pre_eof = false;
ColumnSelectVector run_length_vector;
- //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows: %ld\n",
batch_size,
- // pre_read_rows);
RETURN_IF_ERROR(_read_column_data(block,
_lazy_read_ctx.predicate_columns.first, batch_size,
&pre_read_rows, &pre_eof,
run_length_vector, 0));
- //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows %ld
finished\n", batch_size,
- // pre_read_rows);
if (pre_read_rows == 0) {
DCHECK_EQ(pre_eof, true);
break;
@@ -526,41 +521,31 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
}
ColumnSelectVector& select_vector = *select_vector_ptr;
- /*std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
- if (_cached_filtered_rows != 0) {
- _rebuild_select_vector(select_vector, rebuild_filter_map,
pre_read_rows);
- pre_read_rows += _cached_filtered_rows;
- _cached_filtered_rows = 0;
- }*/
// lazy read columns
size_t lazy_read_rows;
bool lazy_eof;
- //fprintf(stderr, "lazy column pre_read_rows: %ld\n", pre_read_rows);
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns,
pre_read_rows,
&lazy_read_rows, &lazy_eof,
select_vector,
_cached_filtered_rows));
- //fprintf(stderr, "lazy column pre_read_rows: %ld finished\n",
pre_read_rows);
if (pre_read_rows != lazy_read_rows) {
return Status::Corruption("Can't read the same number of rows when
doing lazy read");
}
if (_cached_filtered_rows != 0) {
_cached_filtered_rows = 0;
}
+
+ RETURN_IF_ERROR(
+ _fill_partition_columns(block, lazy_read_rows,
_lazy_read_ctx.partition_columns));
+ RETURN_IF_ERROR(_fill_missing_columns(block, lazy_read_rows,
_lazy_read_ctx.missing_columns));
// pre_eof ^ lazy_eof
// we set pre_read_rows as batch_size for lazy read columns, so pre_eof !=
lazy_eof
// filter data in predicate columns, and remove filter column
if (select_vector.has_filter()) {
- //if (block->columns() == origin_column_num) {
- // the whole row group has been filtered by
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
- // generated from next batch, so the filter column is removed ahead.
- // DCHECK_EQ(block->rows(), 0);
- //} else {
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter,
result_filter));
Block::erase_useless_column(block, origin_column_num);
- //}
} else {
Block::erase_useless_column(block, origin_column_num);
}
@@ -582,8 +567,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
*read_rows = column_size;
*batch_eof = pre_eof;
- RETURN_IF_ERROR(_fill_partition_columns(block, column_size,
_lazy_read_ctx.partition_columns));
- RETURN_IF_ERROR(_fill_missing_columns(block, column_size,
_lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, nullptr, block,
columns_to_filter,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]