This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 202ad5c659 [feature-wip](parquet-reader) bug fix, the number of rows
are different among columns in a block (#12228)
202ad5c659 is described below
commit 202ad5c659c243e370b18a5ed8661be0068af7fc
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Sep 2 09:50:25 2022 +0800
[feature-wip](parquet-reader) bug fix, the number of rows are different
among columns in a block (#12228)
1. `ExprContext` is delete in `ParquetReader::close()`, but it has not been
closed,
so the `DCHECH` in `~ExprContext()` is failed. the lifetime of
`ExprContext` is managed by scan node,
so we should not delete its pointer in `ParquetReader::close()`.
2. `RowGroupReader::next_batch` will update `_read_rows` in every column
loop,
and does not ensure the number of rows in every column are equal.
3. The skipped row ranges are variables in stack, which are released when
calling `ArrayColumnReader::read_column_data`, so we should copy them out.
---
.../exec/format/parquet/vparquet_column_reader.cpp | 8 +++----
.../exec/format/parquet/vparquet_column_reader.h | 2 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 26 ++++++++++++++++++----
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 4 ----
4 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index cffdf00219..789a96d9a0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -73,7 +73,7 @@ Status ScalarColumnReader::init(FileReader* file,
FieldSchema* field, tparquet::
std::vector<RowRange>& row_ranges) {
_stream_reader =
new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size());
- _row_ranges = &row_ranges;
+ _row_ranges = row_ranges;
_chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field,
_ctz));
RETURN_IF_ERROR(_chunk_reader->init());
if (_chunk_reader->max_def_level() > 1) {
@@ -94,7 +94,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
return Status::OK();
}
RETURN_IF_ERROR(_chunk_reader->next_page());
- if (_row_ranges->size() != 0) {
+ if (_row_ranges.size() != 0) {
_skipped_pages();
}
RETURN_IF_ERROR(_chunk_reader->load_page_data());
@@ -160,7 +160,7 @@ Status ArrayColumnReader::init(FileReader* file,
FieldSchema* field, tparquet::C
std::vector<RowRange>& row_ranges) {
_stream_reader =
new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size());
- _row_ranges = &row_ranges;
+ _row_ranges = row_ranges;
_chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk,
&field->children[0], _ctz));
RETURN_IF_ERROR(_chunk_reader->init());
if (_chunk_reader->max_def_level() > 4) {
@@ -191,7 +191,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr&
return Status::OK();
}
RETURN_IF_ERROR(_chunk_reader->next_page());
- if (_row_ranges->size() != 0) {
+ if (_row_ranges.size() != 0) {
_skipped_pages();
}
RETURN_IF_ERROR(_chunk_reader->load_page_data());
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index dec1c60df6..fc197748b8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -72,7 +72,7 @@ protected:
const ParquetReadColumn& _column;
BufferedFileStreamReader* _stream_reader;
std::unique_ptr<ParquetColumnMetadata> _metadata;
- std::vector<RowRange>* _row_ranges;
+ std::vector<RowRange> _row_ranges;
cctz::time_zone* _ctz;
std::unique_ptr<ColumnChunkReader> _chunk_reader;
std::unique_ptr<level_t[]> _def_levels_buf = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0f6b0d084b..91b6a5aa18 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -62,16 +62,34 @@ Status RowGroupReader::_init_column_readers(const
FieldDescriptor& schema,
}
Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool*
_batch_eof) {
+ size_t batch_read_rows = 0;
+ bool has_eof = false;
+ int col_idx = 0;
for (auto& read_col : _read_columns) {
auto slot_desc = read_col._slot_desc;
auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
- size_t batch_read_rows = 0;
- RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
- column_ptr, column_type, batch_size, &batch_read_rows,
_batch_eof));
- _read_rows += batch_read_rows;
+ size_t col_read_rows = 0;
+ bool col_eof = false;
+ while (!col_eof && col_read_rows < batch_size) {
+ size_t loop_rows = 0;
+ RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
+ column_ptr, column_type, batch_size - col_read_rows,
&loop_rows, &col_eof));
+ col_read_rows += loop_rows;
+ }
+ if (col_idx > 0 && (has_eof ^ col_eof)) {
+ return Status::Corruption("The number of rows are not equal among
parquet columns");
+ }
+ if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
+ return Status::Corruption("Can't read the same number of rows
among parquet columns");
+ }
+ batch_read_rows = col_read_rows;
+ has_eof = col_eof;
+ col_idx++;
}
+ _read_rows += batch_read_rows;
+ *_batch_eof = has_eof;
// use data fill utils read column data to column ptr
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a58cbc825a..bbb6a169b4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -41,10 +41,6 @@ ParquetReader::~ParquetReader() {
void ParquetReader::close() {
for (auto& conjuncts : _slot_conjuncts) {
- for (auto expr : conjuncts.second) {
- delete expr;
- expr = nullptr;
- }
conjuncts.second.clear();
}
_row_group_readers.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]