This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c16740f5c [feature-wip](parquet-reader) parquert scanner can read 
data (#11970)
0c16740f5c is described below

commit 0c16740f5c7690183427043e6b8ad09391fac5b8
Author: slothever <[email protected]>
AuthorDate: Fri Aug 26 09:43:46 2022 +0800

    [feature-wip](parquet-reader) parquert scanner can read data (#11970)
    
    Co-authored-by: jinzhe <[email protected]>
---
 be/src/io/file_factory.cpp                         |   4 +
 be/src/vec/exec/file_hdfs_scanner.cpp              |  12 +-
 be/src/vec/exec/file_hdfs_scanner.h                |   2 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  14 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  81 ++++----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  18 +-
 be/test/vec/exec/parquet/parquet_reader_test.cpp   | 223 +++++++++++++++++++--
 7 files changed, 277 insertions(+), 77 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 46d19669b9..66e136a8fd 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -142,6 +142,10 @@ doris::Status 
doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeP
     }
 
     switch (type) {
+    case TFileType::FILE_LOCAL: {
+        file_reader_ptr = new LocalFileReader(range.path, range.start_offset);
+        break;
+    }
     case TFileType::FILE_S3: {
         file_reader_ptr = new BufferedReader(
                 profile, new S3Reader(params.properties, range.path, 
range.start_offset));
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp 
b/be/src/vec/exec/file_hdfs_scanner.cpp
index 6459da6fff..1ba650dedd 100644
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ b/be/src/vec/exec/file_hdfs_scanner.cpp
@@ -28,6 +28,10 @@ ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* 
state, RuntimeProfi
                                                ScannerCounter* counter)
         : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, 
counter) {}
 
+ParquetFileHdfsScanner::~ParquetFileHdfsScanner() {
+    ParquetFileHdfsScanner::close();
+}
+
 Status ParquetFileHdfsScanner::open() {
     RETURN_IF_ERROR(FileScanner::open());
     if (_ranges.empty()) {
@@ -48,14 +52,18 @@ Status ParquetFileHdfsScanner::get_next(vectorized::Block* 
block, bool* eof) {
     bool range_eof = false;
     RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
     if (range_eof) {
-        RETURN_IF_ERROR(_get_next_reader(_next_range++));
+        _next_range++;
+        RETURN_IF_ERROR(_get_next_reader(_next_range));
     }
     return Status::OK();
 }
 
 Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) {
+    if (_next_range >= _ranges.size()) {
+        _scanner_eof = true;
+        return Status::OK();
+    }
     const TFileRangeDesc& range = _ranges[_next_range];
-    _current_range_offset = range.start_offset;
     std::unique_ptr<FileReader> file_reader;
     RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), 
_profile, _params, range,
                                                     file_reader));
diff --git a/be/src/vec/exec/file_hdfs_scanner.h 
b/be/src/vec/exec/file_hdfs_scanner.h
index e24063c89b..ac73444fcd 100644
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ b/be/src/vec/exec/file_hdfs_scanner.h
@@ -38,6 +38,7 @@ public:
                            const TFileScanRangeParams& params,
                            const std::vector<TFileRangeDesc>& ranges,
                            const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter);
+    ~ParquetFileHdfsScanner();
     Status open() override;
 
     Status get_next(vectorized::Block* block, bool* eof) override;
@@ -51,7 +52,6 @@ private:
 
 private:
     std::shared_ptr<ParquetReader> _reader;
-    int64_t _current_range_offset;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f15a7d5623..66d9793ab5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -84,20 +84,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     size_t read_values = _chunk_reader->remaining_num_values() < batch_size
                                  ? _chunk_reader->remaining_num_values()
                                  : batch_size;
+    RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, 
read_values));
     *read_rows = read_values;
-    WhichDataType which_type(type);
-    switch (_metadata->t_metadata().type) {
-    case tparquet::Type::INT32:
-    case tparquet::Type::INT64:
-    case tparquet::Type::FLOAT:
-    case tparquet::Type::DOUBLE:
-    case tparquet::Type::BOOLEAN: {
-        _chunk_reader->decode_values(doris_column, type, read_values);
-        return Status::OK();
-    }
-    default:
-        return Status::Corruption("unsupported parquet data type");
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 0127aae1d2..a58cbc825a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -28,6 +28,7 @@ ParquetReader::ParquetReader(FileReader* file_reader, int32_t 
num_of_columns_fro
           _range_start_offset(range_start_offset),
           _range_size(range_size),
           _ctz(ctz) {
+    DCHECK(range_start_offset < range_size);
     _file_reader = file_reader;
     _total_groups = 0;
     _current_row_group_id = 0;
@@ -40,14 +41,16 @@ ParquetReader::~ParquetReader() {
 
 void ParquetReader::close() {
     for (auto& conjuncts : _slot_conjuncts) {
+        for (auto expr : conjuncts.second) {
+            delete expr;
+            expr = nullptr;
+        }
         conjuncts.second.clear();
     }
+    _row_group_readers.clear();
     _slot_conjuncts.clear();
-    if (_file_reader != nullptr) {
-        _file_reader->close();
-        delete _file_reader;
-        _file_reader = nullptr;
-    }
+    _file_reader->close();
+    delete _file_reader;
 }
 
 Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
@@ -55,9 +58,12 @@ Status ParquetReader::init_reader(const TupleDescriptor* 
tuple_desc,
                                   std::vector<ExprContext*>& conjunct_ctxs,
                                   const std::string& timezone) {
     _file_reader->open();
-    _conjunct_ctxs.reset(&conjunct_ctxs);
+    _tuple_desc = tuple_desc;
+    if (_tuple_desc->slots().size() == 0) {
+        return Status::EndOfFile("No Parquet column need load");
+    }
     RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
-    _t_metadata.reset(&_file_metadata->to_thrift_metadata());
+    _t_metadata = &_file_metadata->to_thrift_metadata();
     _total_groups = _file_metadata->num_row_groups();
     if (_total_groups == 0) {
         return Status::EndOfFile("Empty Parquet File");
@@ -69,8 +75,7 @@ Status ParquetReader::init_reader(const TupleDescriptor* 
tuple_desc,
         _map_column.emplace(schema_desc.get_column(i)->name, i);
     }
     RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
-    RETURN_IF_ERROR(
-            _init_row_group_readers(tuple_desc, _range_start_offset, 
_range_size, conjunct_ctxs));
+    RETURN_IF_ERROR(_init_row_group_readers(conjunct_ctxs));
     return Status::OK();
 }
 
@@ -92,41 +97,46 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
         }
         ParquetReadColumn column(slot_desc);
         _read_columns.emplace_back(column);
-        VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
     }
     return Status::OK();
 }
 
 Status ParquetReader::read_next_batch(Block* block, bool* eof) {
-    DCHECK(_total_groups == _row_group_readers.size());
-    if (_total_groups == 0) {
+    if (_row_group_readers.empty()) {
         *eof = true;
+        return Status::OK();
     }
+    int32_t num_of_readers = _row_group_readers.size();
+    DCHECK(num_of_readers <= _total_groups);
     bool _batch_eof = false;
     auto row_group_reader = _row_group_readers[_current_row_group_id];
     RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, 
&_batch_eof));
     if (_batch_eof) {
-        _current_row_group_id++;
-        if (_current_row_group_id > _total_groups) {
+        _current_row_group_id = _next_row_group_id();
+        if (_current_row_group_id == -1 || _current_row_group_id >= 
num_of_readers) {
             *eof = true;
         }
     }
     return Status::OK();
 }
 
-Status ParquetReader::_init_row_group_readers(const TupleDescriptor* 
tuple_desc,
-                                              int64_t range_start_offset, 
int64_t range_size,
-                                              const std::vector<ExprContext*>& 
conjunct_ctxs) {
-    std::vector<int32_t> read_row_groups;
-    RETURN_IF_ERROR(_filter_row_groups(&read_row_groups));
-    _init_conjuncts(tuple_desc, conjunct_ctxs);
-    for (auto row_group_id : read_row_groups) {
-        VLOG_DEBUG << "_has_page_index";
+int32_t ParquetReader::_next_row_group_id() {
+    if (_read_row_groups.empty()) {
+        return -1;
+    }
+    auto group_id = _read_row_groups.front();
+    _read_row_groups.pop_front();
+    return group_id;
+}
+
+Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& 
conjunct_ctxs) {
+    _init_conjuncts(conjunct_ctxs);
+    RETURN_IF_ERROR(_filter_row_groups());
+    for (auto row_group_id : _read_row_groups) {
         auto row_group = _t_metadata->row_groups[row_group_id];
         auto column_chunks = row_group.columns;
         std::vector<RowRange> skipped_row_ranges;
         if (_has_page_index(column_chunks)) {
-            VLOG_DEBUG << "_process_page_index";
             RETURN_IF_ERROR(_process_page_index(row_group, 
skipped_row_ranges));
         }
         std::shared_ptr<RowGroupReader> row_group_reader;
@@ -136,18 +146,17 @@ Status ParquetReader::_init_row_group_readers(const 
TupleDescriptor* tuple_desc,
         RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), 
skipped_row_ranges));
         _row_group_readers.emplace_back(row_group_reader);
     }
-    VLOG_DEBUG << "_init_row_group_reader finished";
+    _current_row_group_id = _next_row_group_id();
     return Status::OK();
 }
 
-void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
-                                    const std::vector<ExprContext*>& 
conjunct_ctxs) {
-    if (tuple_desc->slots().empty()) {
+void ParquetReader::_init_conjuncts(const std::vector<ExprContext*>& 
conjunct_ctxs) {
+    if (_tuple_desc->slots().empty()) {
         return;
     }
     std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(), 
_include_column_ids.end());
-    for (int i = 0; i < tuple_desc->slots().size(); i++) {
-        auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
+    for (int i = 0; i < _tuple_desc->slots().size(); i++) {
+        auto col_iter = _map_column.find(_tuple_desc->slots()[i]->col_name());
         if (col_iter == _map_column.end()) {
             continue;
         }
@@ -166,7 +175,7 @@ void ParquetReader::_init_conjuncts(const TupleDescriptor* 
tuple_desc,
             }
             SlotRef* slot_ref = (SlotRef*)raw_slot;
             SlotId conjunct_slot_id = slot_ref->slot_id();
-            if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+            if (conjunct_slot_id == _tuple_desc->slots()[i]->id()) {
                 // Get conjuncts by conjunct_slot_id
                 auto iter = _slot_conjuncts.find(conjunct_slot_id);
                 if (_slot_conjuncts.end() == iter) {
@@ -182,23 +191,25 @@ void ParquetReader::_init_conjuncts(const 
TupleDescriptor* tuple_desc,
     }
 }
 
-Status ParquetReader::_filter_row_groups(std::vector<int32_t>* 
read_row_group_ids) {
+Status ParquetReader::_filter_row_groups() {
     if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 
0) {
         return Status::EndOfFile("No row group need read");
     }
-    int32_t row_group_idx = -1;
+    int32_t row_group_idx = 0;
     while (row_group_idx < _total_groups) {
-        row_group_idx++;
         const tparquet::RowGroup& row_group = 
_t_metadata->row_groups[row_group_idx];
         if (_is_misaligned_range_group(row_group)) {
+            row_group_idx++;
             continue;
         }
         bool filter_group = false;
         RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
         if (!filter_group) {
-            read_row_group_ids->emplace_back(row_group_idx);
+            _read_row_groups.push_back(row_group_idx);
+            row_group_idx++;
             break;
         }
+        row_group_idx++;
     }
     return Status::OK();
 }
@@ -217,7 +228,7 @@ bool ParquetReader::_is_misaligned_range_group(const 
tparquet::RowGroup& row_gro
     return false;
 }
 
-bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk> 
columns) {
+bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>& 
columns) {
     _page_index.reset(new PageIndex());
     return _page_index->check_and_get_page_index_ranges(columns);
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 213ed4dc82..d98825b6f0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <queue>
 #include <string>
 #include <vector>
 
@@ -81,14 +82,12 @@ public:
     int64_t size() const { return _file_reader->size(); }
 
 private:
+    int32_t _next_row_group_id();
     Status _init_read_columns(const std::vector<SlotDescriptor*>& 
tuple_slot_descs);
-    Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t 
range_start_offset,
-                                   int64_t range_size,
-                                   const std::vector<ExprContext*>& 
conjunct_ctxs);
-    void _init_conjuncts(const TupleDescriptor* tuple_desc,
-                         const std::vector<ExprContext*>& conjunct_ctxs);
+    Status _init_row_group_readers(const std::vector<ExprContext*>& 
conjunct_ctxs);
+    void _init_conjuncts(const std::vector<ExprContext*>& conjunct_ctxs);
     // Page Index Filter
-    bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
+    bool _has_page_index(std::vector<tparquet::ColumnChunk>& columns);
     Status _process_page_index(tparquet::RowGroup& row_group,
                                std::vector<RowRange>& skipped_row_ranges);
 
@@ -101,7 +100,7 @@ private:
     Status _process_dict_filter(bool* filter_group);
     void _init_bloom_filter();
     Status _process_bloom_filter(bool* filter_group);
-    Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids);
+    Status _filter_row_groups();
     int64_t _get_column_start_offset(const tparquet::ColumnMetaData& 
column_init_column_readers);
     bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
                                    const std::string& encoded_min, const 
std::string& encoded_max);
@@ -113,7 +112,7 @@ private:
 private:
     FileReader* _file_reader;
     std::shared_ptr<FileMetaData> _file_metadata;
-    std::unique_ptr<tparquet::FileMetaData> _t_metadata;
+    tparquet::FileMetaData* _t_metadata;
     std::shared_ptr<PageIndex> _page_index;
     std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
     int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
@@ -121,11 +120,10 @@ private:
     //        std::shared_ptr<Statistics> _statistics;
     const int32_t _num_of_columns_from_file;
     std::map<std::string, int> _map_column; // column-name <---> column-index
-    std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs;
     std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
     std::vector<int> _include_column_ids; // columns that need to get from file
     std::vector<ParquetReadColumn> _read_columns;
-    bool* _file_eof;
+    std::list<int32_t> _read_row_groups;
     // parquet file reader object
     size_t _batch_size;
     int64_t _range_start_offset;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 63d783f613..895fbbd4ac 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -18,11 +18,12 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-//#include "io/buffered_reader.h"
-//#include "io/file_reader.h"
-//#include "io/local_file_reader.h"
-//#include "util/runtime_profile.h"
-//#include "vec/exec/format/parquet/vparquet_file_metadata.h"
+#include "io/local_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/file_hdfs_scanner.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 
 namespace doris {
 namespace vectorized {
@@ -32,17 +33,207 @@ public:
     ParquetReaderTest() {}
 };
 
-//TEST_F(ParquetReaderTest, normal) {
-//    LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0);
-//    auto st = reader.open();
-//    EXPECT_TRUE(st.ok());
-//    std::shared_ptr<FileMetaData> metaData;
-//    parse_thrift_footer(&reader, metaData);
-//    tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
-//    for (auto value : t_metadata.row_groups) {
-//        LOG(WARNING) << "row group num_rows: " << value.num_rows;
-//    }
-//}
+TEST_F(ParquetReaderTest, normal) {
+    TDescriptorTable t_desc_table;
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::OLAP_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    // init boolean and numeric slot
+    std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", 
"smallint_col",
+                                              "int_col",     "bigint_col",  
"float_col",
+                                              "double_col"};
+    for (int i = 0; i < numeric_types.size(); i++) {
+        TSlotDescriptor tslot_desc;
+        {
+            tslot_desc.id = i;
+            tslot_desc.parent = 0;
+            TTypeDesc type;
+            {
+                TTypeNode node;
+                node.__set_type(TTypeNodeType::SCALAR);
+                TScalarType scalar_type;
+                scalar_type.__set_type(TPrimitiveType::type(i + 2));
+                node.__set_scalar_type(scalar_type);
+                type.types.push_back(node);
+            }
+            tslot_desc.slotType = type;
+            tslot_desc.columnPos = 0;
+            tslot_desc.byteOffset = 0;
+            tslot_desc.nullIndicatorByte = 0;
+            tslot_desc.nullIndicatorBit = -1;
+            tslot_desc.colName = numeric_types[i];
+            tslot_desc.slotIdx = 0;
+            tslot_desc.isMaterialized = true;
+            t_desc_table.slotDescriptors.push_back(tslot_desc);
+        }
+    }
+
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        // TTupleDescriptor dest
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = 0;
+        t_tuple_desc.byteSize = 16;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    DescriptorTbl* desc_tbl;
+    ObjectPool obj_pool;
+    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+
+    auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+    LocalFileReader* reader =
+            new 
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
 0);
+
+    cctz::time_zone ctz;
+    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
+    auto p_reader = new ParquetReader(reader, slot_descs.size(), 1024, 0, 
1000, &ctz);
+    RuntimeState runtime_state((TQueryGlobals()));
+    runtime_state.set_desc_tbl(desc_tbl);
+    runtime_state.init_instance_mem_tracker();
+
+    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
+    p_reader->init_reader(tuple_desc, slot_descs, conjunct_ctxs, 
runtime_state.timezone());
+    Block* block = new Block();
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        auto data_type =
+                
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), 
true);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, 
slot_desc->col_name()));
+    }
+    bool eof = false;
+    p_reader->read_next_batch(block, &eof);
+    for (auto& col : block->get_columns_with_type_and_name()) {
+        ASSERT_EQ(col.column->size(), 10);
+    }
+    EXPECT_TRUE(eof);
+    delete block;
+    delete p_reader;
+}
+
+TEST_F(ParquetReaderTest, scanner) {
+    TDescriptorTable t_desc_table;
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::OLAP_TABLE;
+    t_table_desc.numCols = 7;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    // init boolean and numeric slot
+    std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col", 
"smallint_col",
+                                              "int_col",     "bigint_col",  
"float_col",
+                                              "double_col"};
+    for (int i = 0; i < numeric_types.size(); i++) {
+        TSlotDescriptor tslot_desc;
+        {
+            tslot_desc.id = i;
+            tslot_desc.parent = 0;
+            TTypeDesc type;
+            {
+                TTypeNode node;
+                node.__set_type(TTypeNodeType::SCALAR);
+                TScalarType scalar_type;
+                scalar_type.__set_type(TPrimitiveType::type(i + 2));
+                node.__set_scalar_type(scalar_type);
+                type.types.push_back(node);
+            }
+            tslot_desc.slotType = type;
+            tslot_desc.columnPos = 0;
+            tslot_desc.byteOffset = 0;
+            tslot_desc.nullIndicatorByte = 1;
+            tslot_desc.nullIndicatorBit = 1;
+            tslot_desc.colName = numeric_types[i];
+            tslot_desc.slotIdx = 0;
+            tslot_desc.isMaterialized = true;
+            t_desc_table.slotDescriptors.push_back(tslot_desc);
+        }
+    }
+
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = 0;
+        t_tuple_desc.byteSize = 16;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+
+    // set scan range
+    //    std::vector<TScanRangeParams> scan_ranges;
+    TFileScanRange file_scan_range;
+    {
+        //        TScanRangeParams scan_range_params;
+        //        TFileScanRange file_scan_range;
+        TFileScanRangeParams params;
+        {
+            params.__set_src_tuple_id(0);
+            params.__set_num_of_columns_from_file(7);
+            params.file_type = TFileType::FILE_LOCAL;
+            params.format_type = TFileFormatType::FORMAT_PARQUET;
+            std::vector<TFileScanSlotInfo> file_slots;
+            for (int i = 0; i < numeric_types.size(); i++) {
+                TFileScanSlotInfo slot_info;
+                slot_info.slot_id = i;
+                slot_info.is_file_slot = true;
+                file_slots.emplace_back(slot_info);
+            }
+            params.__set_required_slots(file_slots);
+        }
+        file_scan_range.params = params;
+        TFileRangeDesc range;
+        {
+            range.start_offset = 0;
+            range.size = 1000;
+            range.path = 
"./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
+            std::vector<std::string> columns_from_path {"value"};
+            range.__set_columns_from_path(columns_from_path);
+        }
+        file_scan_range.ranges.push_back(range);
+        //        
scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
+        //        scan_ranges.push_back(scan_range_params);
+    }
+
+    std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
+    RuntimeState runtime_state((TQueryGlobals()));
+    runtime_state.init_instance_mem_tracker();
+
+    DescriptorTbl* desc_tbl;
+    ObjectPool obj_pool;
+    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+    runtime_state.set_desc_tbl(desc_tbl);
+    ScannerCounter counter;
+    std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
+    auto scan = new ParquetFileHdfsScanner(&runtime_state, 
runtime_state.runtime_profile(),
+                                           file_scan_range.params, 
file_scan_range.ranges,
+                                           pre_filter_texprs, &counter);
+    scan->reg_conjunct_ctxs(0, conjunct_ctxs);
+    Status st = scan->open();
+    EXPECT_TRUE(st.ok());
+
+    bool eof = false;
+    Block* block = new Block();
+    scan->get_next(block, &eof);
+    for (auto& col : block->get_columns_with_type_and_name()) {
+        ASSERT_EQ(col.column->size(), 10);
+    }
+    delete block;
+    delete scan;
+}
 
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to