This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4a55b504c0 [feature-wip](parquet-reader) bug fix, get the correct
group reader (#12294)
4a55b504c0 is described below
commit 4a55b504c0ba8bb548d54c4011e6326435e75043
Author: slothever <[email protected]>
AuthorDate: Tue Sep 6 13:59:35 2022 +0800
[feature-wip](parquet-reader) bug fix, get the correct group reader (#12294)
Fix the problem that cannot read the lineitem table of TPCH , and the error
of allocate memory
Co-authored-by: jinzhe <[email protected]>
---
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 51 ++++++++++------------
be/src/vec/exec/format/parquet/vparquet_reader.h | 11 ++---
2 files changed, 30 insertions(+), 32 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index bbb6a169b4..e29cca08e4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -44,6 +44,8 @@ void ParquetReader::close() {
conjuncts.second.clear();
}
_row_group_readers.clear();
+ _read_row_groups.clear();
+ _skipped_row_ranges.clear();
_slot_conjuncts.clear();
_file_reader->close();
delete _file_reader;
@@ -98,31 +100,31 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
}
Status ParquetReader::read_next_batch(Block* block, bool* eof) {
- if (_row_group_readers.empty()) {
+ int32_t num_of_readers = _row_group_readers.size();
+ DCHECK(num_of_readers <= _read_row_groups.size());
+ if (_read_row_groups.empty()) {
*eof = true;
return Status::OK();
}
- int32_t num_of_readers = _row_group_readers.size();
- DCHECK(num_of_readers <= _total_groups);
bool _batch_eof = false;
- auto row_group_reader = _row_group_readers[_current_row_group_id];
- RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size,
&_batch_eof));
+ RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size,
&_batch_eof));
if (_batch_eof) {
- _current_row_group_id = _next_row_group_id();
- if (_current_row_group_id == -1 || _current_row_group_id >=
num_of_readers) {
+ if (!_next_row_group_reader()) {
*eof = true;
+ } else {
+ _read_row_groups.pop_front();
}
}
return Status::OK();
}
-int32_t ParquetReader::_next_row_group_id() {
- if (_read_row_groups.empty()) {
- return -1;
+bool ParquetReader::_next_row_group_reader() {
+ if (_row_group_readers.empty()) {
+ return false;
}
- auto group_id = _read_row_groups.front();
- _read_row_groups.pop_front();
- return group_id;
+ _current_group_reader = _row_group_readers.front();
+ _row_group_readers.pop_front();
+ return true;
}
Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>&
conjunct_ctxs) {
@@ -131,18 +133,19 @@ Status ParquetReader::_init_row_group_readers(const
std::vector<ExprContext*>& c
for (auto row_group_id : _read_row_groups) {
auto row_group = _t_metadata->row_groups[row_group_id];
auto column_chunks = row_group.columns;
- std::vector<RowRange> skipped_row_ranges;
if (_has_page_index(column_chunks)) {
- RETURN_IF_ERROR(_process_page_index(row_group,
skipped_row_ranges));
+ RETURN_IF_ERROR(_process_page_index(row_group));
}
std::shared_ptr<RowGroupReader> row_group_reader;
row_group_reader.reset(
new RowGroupReader(_file_reader, _read_columns, row_group_id,
row_group, _ctz));
// todo: can filter row with candidate ranges rather than skipped
ranges
- RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(),
skipped_row_ranges));
+ RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(),
_skipped_row_ranges));
_row_group_readers.emplace_back(row_group_reader);
}
- _current_row_group_id = _next_row_group_id();
+ if (!_next_row_group_reader()) {
+ return Status::EndOfFile("No next reader");
+ }
return Status::OK();
}
@@ -191,21 +194,16 @@ Status ParquetReader::_filter_row_groups() {
if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size <
0) {
return Status::EndOfFile("No row group need read");
}
- int32_t row_group_idx = 0;
- while (row_group_idx < _total_groups) {
+ for (int32_t row_group_idx = 0; row_group_idx < _total_groups;
row_group_idx++) {
const tparquet::RowGroup& row_group =
_t_metadata->row_groups[row_group_idx];
if (_is_misaligned_range_group(row_group)) {
- row_group_idx++;
continue;
}
bool filter_group = false;
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
if (!filter_group) {
- _read_row_groups.push_back(row_group_idx);
- row_group_idx++;
- break;
+ _read_row_groups.emplace_back(row_group_idx);
}
- row_group_idx++;
}
return Status::OK();
}
@@ -229,8 +227,7 @@ bool
ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>& columns)
return _page_index->check_and_get_page_index_ranges(columns);
}
-Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
- std::vector<RowRange>&
skipped_row_ranges) {
+Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group) {
int64_t buffer_size = _page_index->_column_index_size +
_page_index->_offset_index_size;
for (auto col_id : _include_column_ids) {
uint8_t buff[buffer_size];
@@ -257,7 +254,7 @@ Status
ParquetReader::_process_page_index(tparquet::RowGroup& row_group,
RowRange skipped_row_range;
_page_index->create_skipped_row_range(offset_index,
row_group.num_rows, page_id,
&skipped_row_range);
- skipped_row_ranges.emplace_back(skipped_row_range);
+ _skipped_row_ranges.emplace_back(skipped_row_range);
}
}
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index d98825b6f0..9facffa623 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -82,14 +82,13 @@ public:
int64_t size() const { return _file_reader->size(); }
private:
- int32_t _next_row_group_id();
+ bool _next_row_group_reader();
Status _init_read_columns(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
Status _init_row_group_readers(const std::vector<ExprContext*>&
conjunct_ctxs);
void _init_conjuncts(const std::vector<ExprContext*>& conjunct_ctxs);
// Page Index Filter
bool _has_page_index(std::vector<tparquet::ColumnChunk>& columns);
- Status _process_page_index(tparquet::RowGroup& row_group,
- std::vector<RowRange>& skipped_row_ranges);
+ Status _process_page_index(tparquet::RowGroup& row_group);
// Row Group Filter
bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
@@ -113,8 +112,9 @@ private:
FileReader* _file_reader;
std::shared_ptr<FileMetaData> _file_metadata;
tparquet::FileMetaData* _t_metadata;
- std::shared_ptr<PageIndex> _page_index;
- std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
+ std::unique_ptr<PageIndex> _page_index;
+ std::list<std::shared_ptr<RowGroupReader>> _row_group_readers;
+ std::shared_ptr<RowGroupReader> _current_group_reader;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
int32_t _current_row_group_id;
// std::shared_ptr<Statistics> _statistics;
@@ -129,6 +129,7 @@ private:
int64_t _range_start_offset;
int64_t _range_size;
cctz::time_zone* _ctz;
+ std::vector<RowRange> _skipped_row_ranges;
const TupleDescriptor* _tuple_desc; // get all slot info
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]