This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 202ad5c659 [feature-wip](parquet-reader) bug fix, the number of rows 
are different among columns in a block (#12228)
202ad5c659 is described below

commit 202ad5c659c243e370b18a5ed8661be0068af7fc
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Sep 2 09:50:25 2022 +0800

    [feature-wip](parquet-reader) bug fix, the number of rows are different 
among columns in a block (#12228)
    
    1. `ExprContext` is delete in `ParquetReader::close()`, but it has not been 
closed,
    so the `DCHECH` in `~ExprContext()` is failed. the lifetime of 
`ExprContext` is managed by scan node,
    so we should not delete its pointer in `ParquetReader::close()`.
    2. `RowGroupReader::next_batch` will update `_read_rows` in every column 
loop,
    and does not ensure the number of rows in every column are equal.
    3.  The skipped row ranges are variables in stack, which are released when 
calling `ArrayColumnReader::read_column_data`, so we should copy them out.
---
 .../exec/format/parquet/vparquet_column_reader.cpp |  8 +++----
 .../exec/format/parquet/vparquet_column_reader.h   |  2 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  | 26 ++++++++++++++++++----
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  4 ----
 4 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index cffdf00219..789a96d9a0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -73,7 +73,7 @@ Status ScalarColumnReader::init(FileReader* file, 
FieldSchema* field, tparquet::
                                 std::vector<RowRange>& row_ranges) {
     _stream_reader =
             new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
-    _row_ranges = &row_ranges;
+    _row_ranges = row_ranges;
     _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, 
_ctz));
     RETURN_IF_ERROR(_chunk_reader->init());
     if (_chunk_reader->max_def_level() > 1) {
@@ -94,7 +94,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             return Status::OK();
         }
         RETURN_IF_ERROR(_chunk_reader->next_page());
-        if (_row_ranges->size() != 0) {
+        if (_row_ranges.size() != 0) {
             _skipped_pages();
         }
         RETURN_IF_ERROR(_chunk_reader->load_page_data());
@@ -160,7 +160,7 @@ Status ArrayColumnReader::init(FileReader* file, 
FieldSchema* field, tparquet::C
                                std::vector<RowRange>& row_ranges) {
     _stream_reader =
             new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
-    _row_ranges = &row_ranges;
+    _row_ranges = row_ranges;
     _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, 
&field->children[0], _ctz));
     RETURN_IF_ERROR(_chunk_reader->init());
     if (_chunk_reader->max_def_level() > 4) {
@@ -191,7 +191,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr&
             return Status::OK();
         }
         RETURN_IF_ERROR(_chunk_reader->next_page());
-        if (_row_ranges->size() != 0) {
+        if (_row_ranges.size() != 0) {
             _skipped_pages();
         }
         RETURN_IF_ERROR(_chunk_reader->load_page_data());
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index dec1c60df6..fc197748b8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -72,7 +72,7 @@ protected:
     const ParquetReadColumn& _column;
     BufferedFileStreamReader* _stream_reader;
     std::unique_ptr<ParquetColumnMetadata> _metadata;
-    std::vector<RowRange>* _row_ranges;
+    std::vector<RowRange> _row_ranges;
     cctz::time_zone* _ctz;
     std::unique_ptr<ColumnChunkReader> _chunk_reader;
     std::unique_ptr<level_t[]> _def_levels_buf = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0f6b0d084b..91b6a5aa18 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -62,16 +62,34 @@ Status RowGroupReader::_init_column_readers(const 
FieldDescriptor& schema,
 }
 
 Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* 
_batch_eof) {
+    size_t batch_read_rows = 0;
+    bool has_eof = false;
+    int col_idx = 0;
     for (auto& read_col : _read_columns) {
         auto slot_desc = read_col._slot_desc;
         auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
-        size_t batch_read_rows = 0;
-        RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
-                column_ptr, column_type, batch_size, &batch_read_rows, 
_batch_eof));
-        _read_rows += batch_read_rows;
+        size_t col_read_rows = 0;
+        bool col_eof = false;
+        while (!col_eof && col_read_rows < batch_size) {
+            size_t loop_rows = 0;
+            RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
+                    column_ptr, column_type, batch_size - col_read_rows, 
&loop_rows, &col_eof));
+            col_read_rows += loop_rows;
+        }
+        if (col_idx > 0 && (has_eof ^ col_eof)) {
+            return Status::Corruption("The number of rows are not equal among 
parquet columns");
+        }
+        if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
+            return Status::Corruption("Can't read the same number of rows 
among parquet columns");
+        }
+        batch_read_rows = col_read_rows;
+        has_eof = col_eof;
+        col_idx++;
     }
+    _read_rows += batch_read_rows;
+    *_batch_eof = has_eof;
     // use data fill utils read column data to column ptr
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a58cbc825a..bbb6a169b4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -41,10 +41,6 @@ ParquetReader::~ParquetReader() {
 
 void ParquetReader::close() {
     for (auto& conjuncts : _slot_conjuncts) {
-        for (auto expr : conjuncts.second) {
-            delete expr;
-            expr = nullptr;
-        }
         conjuncts.second.clear();
     }
     _row_group_readers.clear();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to