This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpch500 by this push:
     new 3794605d5a2 [Opt](parquet-reader) Opt late materialization of parquet 
reader. (#29181)
3794605d5a2 is described below

commit 3794605d5a2ebd862b7eb9d03da266e14e27f417
Author: Qi Chen <[email protected]>
AuthorDate: Thu Dec 28 17:21:54 2023 +0800

    [Opt](parquet-reader) Opt late materialization of parquet reader. (#29181)
---
 be/src/vec/columns/column_nullable.cpp             | 13 +++++
 .../format/parquet/fix_length_plain_decoder.cpp    |  9 ++-
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 11 ++--
 .../exec/format/parquet/vparquet_column_reader.cpp | 65 ++++++++++++++++------
 .../exec/format/parquet/vparquet_column_reader.h   | 15 +++--
 .../exec/format/parquet/vparquet_group_reader.cpp  | 46 +++++++++------
 .../exec/format/parquet/vparquet_group_reader.h    |  2 +-
 7 files changed, 116 insertions(+), 45 deletions(-)

diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 426de2d4f70..b44482781e8 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -370,6 +370,19 @@ size_t ColumnNullable::filter(const Filter& filter) {
     return data_result_size;
 }
 
+//size_t ColumnNullable::filter(const Filter& filter) {
+//    const auto data_result_size = get_nested_column().filter(filter);
+//
+//    get_null_map_column().resize(data_result_size);
+//    /*if (!_has_null) {
+//        get_null_map_column().resize(data_result_size);
+//    } else {
+//        const auto map_result_size = get_null_map_column().filter(filter);
+//        CHECK_EQ(data_result_size, map_result_size);
+//    }*/
+//    return data_result_size;
+//}
+
 Status ColumnNullable::filter_by_selector(const uint16_t* sel, size_t 
sel_size, IColumn* col_ptr) {
     const auto* nullable_col_ptr = reinterpret_cast<const 
ColumnNullable*>(col_ptr);
     ColumnPtr nest_col_ptr = nullable_col_ptr->nested_column;
diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp 
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
index 8e6f6ebb67f..3160f5f5f1d 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
@@ -228,15 +228,20 @@ Status 
FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
     auto& column_data = 
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
     size_t data_index = column_data.size();
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
+    //fprintf(stderr, "column_data.size(): %ld\n", column_data.size());
     ColumnSelectVector::DataReadType read_type;
     while (size_t run_length = 
select_vector.get_next_run<has_filter>(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            for (size_t i = 0; i < run_length; ++i) {
+            /*for (size_t i = 0; i < run_length; ++i) {
                 char* buf_start = _data->data + _offset;
                 column_data[data_index++] = *(PhysicalType*)buf_start;
                 _offset += _type_length;
-            }
+            }*/
+            memcpy(column_data.data() + data_index, _data->data + _offset,
+                   run_length * _type_length);
+            data_index += run_length;
+            _offset += _type_length * run_length;
             break;
         }
         case ColumnSelectVector::NULL_DATA: {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index cbef2a0f286..241135eef61 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -75,23 +75,26 @@ void ColumnSelectVector::set_run_length_null_map(const 
std::vector<uint16_t>& ru
             if (is_null) {
                 _num_nulls += run_length;
                 for (int i = 0; i < run_length; ++i) {
-                    _data_map[map_index++] = FILTERED_NULL;
+                    //_data_map[map_index++] = FILTERED_NULL;
+                    _data_map[map_index++] = NULL_DATA;
                 }
             } else {
                 for (int i = 0; i < run_length; ++i) {
-                    _data_map[map_index++] = FILTERED_CONTENT;
+                    //_data_map[map_index++] = FILTERED_CONTENT;
+                    _data_map[map_index++] = CONTENT;
                 }
             }
             is_null = !is_null;
         }
         size_t num_read = 0;
-        DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
+        /*DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
         for (size_t i = 0; i < num_values; ++i) {
             if (_filter_map[_filter_map_index++]) {
                 _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : 
CONTENT;
                 num_read++;
             }
-        }
+        }*/
+        num_read = num_values;
         _num_filtered = num_values - num_read;
         if (null_map != nullptr && num_read > 0) {
             NullMap& map_data_column = *null_map;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index be159f4a384..f0120f49701 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -475,7 +475,10 @@ Status ScalarColumnReader::_try_load_dict_page(bool* 
loaded, bool* has_dict) {
 
 Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             ColumnSelectVector& select_vector, 
size_t batch_size,
-                                            size_t* read_rows, bool* eof, bool 
is_dict_filter) {
+                                            size_t* read_rows, bool* eof, bool 
is_dict_filter,
+                                            size_t skip_nums, size_t* 
skipped_nums) {
+    //fprintf(stderr, "batch_size: %ld, skip_nums: %ld\n", batch_size, 
skip_nums);
+    *skipped_nums = 0;
     if (_chunk_reader->remaining_num_values() == 0) {
         if (!_chunk_reader->has_next_page()) {
             *eof = true;
@@ -500,10 +503,10 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         RETURN_IF_ERROR(_chunk_reader->skip_page());
         *read_rows = 0;
     } else {
-        bool skip_whole_batch = false;
+        //bool skip_whole_batch = false;
         // Determining whether to skip page or batch will increase the 
calculation time.
         // When the filtering effect is greater than 60%, it is possible to 
skip the page or batch.
-        if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) {
+        /*if (select_vector.has_filter() && select_vector.filter_ratio() > 
0.6) {
             // lazy read
             size_t remaining_num_values = 0;
             for (auto& range : read_ranges) {
@@ -513,6 +516,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                 select_vector.can_filter_all(remaining_num_values)) {
                 // We can skip the whole page if the remaining values is 
filtered by predicate columns
                 select_vector.skip(remaining_num_values);
+               fprintf(stderr, "select_vector.skip1(%ld)\n", 
remaining_num_values);
                 _current_row_index += _chunk_reader->remaining_num_values();
                 RETURN_IF_ERROR(_chunk_reader->skip_page());
                 *read_rows = remaining_num_values;
@@ -525,25 +529,45 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                     batch_size <= remaining_num_values && 
select_vector.can_filter_all(batch_size);
             if (skip_whole_batch) {
                 select_vector.skip(batch_size);
+               fprintf(stderr, "select_vector.skip2(%ld)\n", batch_size);
             }
-        }
+        }*/
         // load page data to decode or skip values
         RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
+
+        size_t has_skipped = 0;
         size_t has_read = 0;
         for (auto& range : read_ranges) {
             // generate the skipped values
             size_t skip_values = range.first_row - _current_row_index;
             RETURN_IF_ERROR(_skip_values(skip_values));
             _current_row_index += skip_values;
+
+            if (skip_nums > 0) {
+                size_t skip_values = std::min((size_t)(range.last_row - 
_current_row_index),
+                                              skip_nums - has_skipped);
+                RETURN_IF_ERROR(_skip_values(skip_values));
+                _current_row_index += skip_values;
+                has_skipped += skip_values;
+                if (has_skipped < skip_nums) {
+                    continue;
+                }
+            }
+
             // generate the read values
             size_t read_values =
-                    std::min((size_t)(range.last_row - range.first_row), 
batch_size - has_read);
-            if (skip_whole_batch) {
+                    std::min((size_t)(range.last_row - _current_row_index), 
batch_size - has_read);
+            /*if (skip_whole_batch) {
                 RETURN_IF_ERROR(_skip_values(read_values));
+               fprintf(stderr, "_skip_values(%ld)\n", read_values);
             } else {
                 RETURN_IF_ERROR(_read_values(read_values, doris_column, type, 
select_vector,
                                              is_dict_filter));
-            }
+               fprintf(stderr, "_read_values(%ld)\n", read_values);
+            }*/
+            //fprintf(stderr, "read_values: %ld\n", read_values);
+            RETURN_IF_ERROR(
+                    _read_values(read_values, doris_column, type, 
select_vector, is_dict_filter));
             has_read += read_values;
             _current_row_index += read_values;
             if (has_read == batch_size) {
@@ -551,11 +575,13 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
             }
         }
         *read_rows = has_read;
+        *skipped_nums = has_skipped;
     }
 
     if (_chunk_reader->remaining_num_values() == 0 && 
!_chunk_reader->has_next_page()) {
         *eof = true;
     }
+    //fprintf(stderr, "*read_rows: %ld\n", *read_rows);
     return Status::OK();
 }
 
@@ -568,7 +594,8 @@ Status 
ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read
 
 Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                            ColumnSelectVector& select_vector, 
size_t batch_size,
-                                           size_t* read_rows, bool* eof, bool 
is_dict_filter) {
+                                           size_t* read_rows, bool* eof, bool 
is_dict_filter,
+                                           size_t skip_nums, size_t* 
skipped_nums) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -589,7 +616,8 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr&
                     ->get_nested_type());
     // read nested column
     RETURN_IF_ERROR(_element_reader->read_column_data(element_column, 
element_type, select_vector,
-                                                      batch_size, read_rows, 
eof, is_dict_filter));
+                                                      batch_size, read_rows, 
eof, is_dict_filter,
+                                                      skip_nums, 
skipped_nums));
     if (*read_rows == 0) {
         return Status::OK();
     }
@@ -614,7 +642,8 @@ Status 
MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
 
 Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& 
type,
                                          ColumnSelectVector& select_vector, 
size_t batch_size,
-                                         size_t* read_rows, bool* eof, bool 
is_dict_filter) {
+                                         size_t* read_rows, bool* eof, bool 
is_dict_filter,
+                                         size_t skip_nums, size_t* 
skipped_nums) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -642,13 +671,14 @@ Status MapColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr& t
     bool key_eof = false;
     bool value_eof = false;
     RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, 
select_vector, batch_size,
-                                                  &key_rows, &key_eof, 
is_dict_filter));
+                                                  &key_rows, &key_eof, 
is_dict_filter, skip_nums,
+                                                  skipped_nums));
     while (value_rows < key_rows && !value_eof) {
         size_t loop_rows = 0;
         select_vector.reset();
-        RETURN_IF_ERROR(_value_reader->read_column_data(value_column, 
value_type, select_vector,
-                                                        key_rows - value_rows, 
&loop_rows,
-                                                        &value_eof, 
is_dict_filter));
+        RETURN_IF_ERROR(_value_reader->read_column_data(
+                value_column, value_type, select_vector, key_rows - 
value_rows, &loop_rows,
+                &value_eof, is_dict_filter, skip_nums, skipped_nums));
         value_rows += loop_rows;
     }
     DCHECK_EQ(key_rows, value_rows);
@@ -677,7 +707,8 @@ Status 
StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>
 }
 Status StructColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             ColumnSelectVector& select_vector, 
size_t batch_size,
-                                            size_t* read_rows, bool* eof, bool 
is_dict_filter) {
+                                            size_t* read_rows, bool* eof, bool 
is_dict_filter,
+                                            size_t skip_nums, size_t* 
skipped_nums) {
     MutableColumnPtr data_column;
     NullMap* null_map_ptr = nullptr;
     if (doris_column->is_nullable()) {
@@ -707,7 +738,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
         if (i == 0) {
             static_cast<void>(_child_readers[i]->read_column_data(
                     doris_field, doris_type, select_vector, batch_size, 
&field_rows, &field_eof,
-                    is_dict_filter));
+                    is_dict_filter, skip_nums, skipped_nums));
             *read_rows = field_rows;
             *eof = field_eof;
         } else {
@@ -716,7 +747,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
                 select_vector.reset();
                 static_cast<void>(_child_readers[i]->read_column_data(
                         doris_field, doris_type, select_vector, *read_rows - 
field_rows, &loop_rows,
-                        &field_eof, is_dict_filter));
+                        &field_eof, is_dict_filter, skip_nums, skipped_nums));
                 field_rows += loop_rows;
             }
             DCHECK_EQ(*read_rows, field_rows);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index efe758898d5..2ae06b7dbbf 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -112,7 +112,8 @@ public:
     virtual ~ParquetColumnReader() = default;
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                                     ColumnSelectVector& select_vector, size_t 
batch_size,
-                                    size_t* read_rows, bool* eof, bool 
is_dict_filter) = 0;
+                                    size_t* read_rows, bool* eof, bool 
is_dict_filter,
+                                    size_t skip_nums = 0, size_t* skipped_nums 
= nullptr) = 0;
 
     virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, 
bool* has_dict) {
         return Status::NotSupported("read_dict_values_to_column is not 
supported");
@@ -164,7 +165,8 @@ public:
     Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+                            bool* eof, bool is_dict_filter, size_t skip_nums = 
0,
+                            size_t* skipped_nums = nullptr) override;
     Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* 
has_dict) override;
     Status get_dict_codes(const ColumnString* column_string,
                           std::vector<int32_t>* dict_codes) override;
@@ -203,7 +205,8 @@ public:
     Status init(std::unique_ptr<ParquetColumnReader> element_reader, 
FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+                            bool* eof, bool is_dict_filter, size_t skip_nums = 
0,
+                            size_t* skipped_nums = nullptr) override;
     const std::vector<level_t>& get_rep_level() const override {
         return _element_reader->get_rep_level();
     }
@@ -229,7 +232,8 @@ public:
                 std::unique_ptr<ParquetColumnReader> value_reader, 
FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+                            bool* eof, bool is_dict_filter, size_t skip_nums = 
0,
+                            size_t* skipped_nums = nullptr) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         return _key_reader->get_rep_level();
@@ -264,7 +268,8 @@ public:
                 FieldSchema* field);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             ColumnSelectVector& select_vector, size_t 
batch_size, size_t* read_rows,
-                            bool* eof, bool is_dict_filter) override;
+                            bool* eof, bool is_dict_filter, size_t skip_nums = 
0,
+                            size_t* skipped_nums = nullptr) override;
 
     const std::vector<level_t>& get_rep_level() const override {
         return _child_readers[0]->get_rep_level();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index d7253d10de0..0c05dcf8f1c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -309,7 +309,7 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
     } else {
         ColumnSelectVector run_length_vector;
         RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.all_read_columns, batch_size,
-                                          read_rows, batch_eof, 
run_length_vector));
+                                          read_rows, batch_eof, 
run_length_vector, 0));
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
@@ -373,7 +373,7 @@ void 
RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
 
 Status RowGroupReader::_read_column_data(Block* block, const 
std::vector<std::string>& columns,
                                          size_t batch_size, size_t* read_rows, 
bool* batch_eof,
-                                         ColumnSelectVector& select_vector) {
+                                         ColumnSelectVector& select_vector, 
size_t skip_nums) {
     size_t batch_read_rows = 0;
     bool has_eof = false;
     for (auto& read_col_name : columns) {
@@ -405,11 +405,15 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
         bool col_eof = false;
         // Should reset _filter_map_index to 0 when reading next column.
         select_vector.reset();
+        size_t col_skip_nums = skip_nums;
         while (!col_eof && col_read_rows < batch_size) {
             size_t loop_rows = 0;
+            size_t skipped_nums = 0;
             RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
                     column_ptr, column_type, select_vector, batch_size - 
col_read_rows, &loop_rows,
-                    &col_eof, is_dict_filter));
+                    &col_eof, is_dict_filter, col_skip_nums, &skipped_nums));
+            //fprintf(stderr, "batch_size: %ld, col_read_rows: %ld, loop_rows: 
%ld\n", batch_size, col_read_rows, loop_rows);
+            col_skip_nums -= skipped_nums;
             col_read_rows += loop_rows;
         }
         if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
@@ -442,8 +446,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         pre_read_rows = 0;
         pre_eof = false;
         ColumnSelectVector run_length_vector;
+        //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows: %ld\n", 
batch_size,
+        //        pre_read_rows);
         RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.predicate_columns.first, batch_size,
-                                          &pre_read_rows, &pre_eof, 
run_length_vector));
+                                          &pre_read_rows, &pre_eof, 
run_length_vector, 0));
+        //fprintf(stderr, "pre column batch_size: %ld, pre_read_rows %ld 
finished\n", batch_size,
+        //        pre_read_rows);
         if (pre_read_rows == 0) {
             DCHECK_EQ(pre_eof, true);
             break;
@@ -518,35 +526,41 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     }
 
     ColumnSelectVector& select_vector = *select_vector_ptr;
-    std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
+    /*std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
     if (_cached_filtered_rows != 0) {
         _rebuild_select_vector(select_vector, rebuild_filter_map, 
pre_read_rows);
         pre_read_rows += _cached_filtered_rows;
         _cached_filtered_rows = 0;
-    }
+    }*/
 
     // lazy read columns
     size_t lazy_read_rows;
     bool lazy_eof;
+    //fprintf(stderr, "lazy column pre_read_rows: %ld\n", pre_read_rows);
     RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, 
pre_read_rows,
-                                      &lazy_read_rows, &lazy_eof, 
select_vector));
+                                      &lazy_read_rows, &lazy_eof, 
select_vector,
+                                      _cached_filtered_rows));
+    //fprintf(stderr, "lazy column pre_read_rows: %ld finished\n", 
pre_read_rows);
     if (pre_read_rows != lazy_read_rows) {
         return Status::Corruption("Can't read the same number of rows when 
doing lazy read");
     }
+    if (_cached_filtered_rows != 0) {
+        _cached_filtered_rows = 0;
+    }
     // pre_eof ^ lazy_eof
     // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != 
lazy_eof
 
     // filter data in predicate columns, and remove filter column
     if (select_vector.has_filter()) {
-        if (block->columns() == origin_column_num) {
-            // the whole row group has been filtered by 
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
-            // generated from next batch, so the filter column is removed 
ahead.
-            DCHECK_EQ(block->rows(), 0);
-        } else {
-            RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
-                    block, _lazy_read_ctx.all_predicate_col_ids, 
result_filter));
-            Block::erase_useless_column(block, origin_column_num);
-        }
+        //if (block->columns() == origin_column_num) {
+        // the whole row group has been filtered by 
_lazy_read_ctx.vconjunct_ctx, and batch_eof is
+        // generated from next batch, so the filter column is removed ahead.
+        //    DCHECK_EQ(block->rows(), 0);
+        //} else {
+        RETURN_IF_CATCH_EXCEPTION(
+                Block::filter_block_internal(block, columns_to_filter, 
result_filter));
+        Block::erase_useless_column(block, origin_column_num);
+        //}
     } else {
         Block::erase_useless_column(block, origin_column_num);
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ad918bd97fc..6f6b8f571bc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -167,7 +167,7 @@ private:
     Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
     Status _read_column_data(Block* block, const std::vector<std::string>& 
columns,
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
-                             ColumnSelectVector& select_vector);
+                             ColumnSelectVector& select_vector, size_t 
skip_nums);
     Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, 
bool* batch_eof);
     void _rebuild_select_vector(ColumnSelectVector& select_vector,
                                 std::unique_ptr<uint8_t[]>& filter_map, size_t 
pre_read_rows) const;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to