This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0c16740f5c [feature-wip](parquet-reader) parquert scanner can read
data (#11970)
0c16740f5c is described below
commit 0c16740f5c7690183427043e6b8ad09391fac5b8
Author: slothever <[email protected]>
AuthorDate: Fri Aug 26 09:43:46 2022 +0800
[feature-wip](parquet-reader) parquert scanner can read data (#11970)
Co-authored-by: jinzhe <[email protected]>
---
be/src/io/file_factory.cpp | 4 +
be/src/vec/exec/file_hdfs_scanner.cpp | 12 +-
be/src/vec/exec/file_hdfs_scanner.h | 2 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 14 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 81 ++++----
be/src/vec/exec/format/parquet/vparquet_reader.h | 18 +-
be/test/vec/exec/parquet/parquet_reader_test.cpp | 223 +++++++++++++++++++--
7 files changed, 277 insertions(+), 77 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 46d19669b9..66e136a8fd 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -142,6 +142,10 @@ doris::Status
doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeP
}
switch (type) {
+ case TFileType::FILE_LOCAL: {
+ file_reader_ptr = new LocalFileReader(range.path, range.start_offset);
+ break;
+ }
case TFileType::FILE_S3: {
file_reader_ptr = new BufferedReader(
profile, new S3Reader(params.properties, range.path,
range.start_offset));
diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp
b/be/src/vec/exec/file_hdfs_scanner.cpp
index 6459da6fff..1ba650dedd 100644
--- a/be/src/vec/exec/file_hdfs_scanner.cpp
+++ b/be/src/vec/exec/file_hdfs_scanner.cpp
@@ -28,6 +28,10 @@ ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState*
state, RuntimeProfi
ScannerCounter* counter)
: HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs,
counter) {}
+ParquetFileHdfsScanner::~ParquetFileHdfsScanner() {
+ ParquetFileHdfsScanner::close();
+}
+
Status ParquetFileHdfsScanner::open() {
RETURN_IF_ERROR(FileScanner::open());
if (_ranges.empty()) {
@@ -48,14 +52,18 @@ Status ParquetFileHdfsScanner::get_next(vectorized::Block*
block, bool* eof) {
bool range_eof = false;
RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof));
if (range_eof) {
- RETURN_IF_ERROR(_get_next_reader(_next_range++));
+ _next_range++;
+ RETURN_IF_ERROR(_get_next_reader(_next_range));
}
return Status::OK();
}
Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) {
+ if (_next_range >= _ranges.size()) {
+ _scanner_eof = true;
+ return Status::OK();
+ }
const TFileRangeDesc& range = _ranges[_next_range];
- _current_range_offset = range.start_offset;
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(),
_profile, _params, range,
file_reader));
diff --git a/be/src/vec/exec/file_hdfs_scanner.h
b/be/src/vec/exec/file_hdfs_scanner.h
index e24063c89b..ac73444fcd 100644
--- a/be/src/vec/exec/file_hdfs_scanner.h
+++ b/be/src/vec/exec/file_hdfs_scanner.h
@@ -38,6 +38,7 @@ public:
const TFileScanRangeParams& params,
const std::vector<TFileRangeDesc>& ranges,
const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter);
+ ~ParquetFileHdfsScanner();
Status open() override;
Status get_next(vectorized::Block* block, bool* eof) override;
@@ -51,7 +52,6 @@ private:
private:
std::shared_ptr<ParquetReader> _reader;
- int64_t _current_range_offset;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index f15a7d5623..66d9793ab5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -84,20 +84,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
: batch_size;
+ RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type,
read_values));
*read_rows = read_values;
- WhichDataType which_type(type);
- switch (_metadata->t_metadata().type) {
- case tparquet::Type::INT32:
- case tparquet::Type::INT64:
- case tparquet::Type::FLOAT:
- case tparquet::Type::DOUBLE:
- case tparquet::Type::BOOLEAN: {
- _chunk_reader->decode_values(doris_column, type, read_values);
- return Status::OK();
- }
- default:
- return Status::Corruption("unsupported parquet data type");
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 0127aae1d2..a58cbc825a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -28,6 +28,7 @@ ParquetReader::ParquetReader(FileReader* file_reader, int32_t
num_of_columns_fro
_range_start_offset(range_start_offset),
_range_size(range_size),
_ctz(ctz) {
+ DCHECK(range_start_offset < range_size);
_file_reader = file_reader;
_total_groups = 0;
_current_row_group_id = 0;
@@ -40,14 +41,16 @@ ParquetReader::~ParquetReader() {
void ParquetReader::close() {
for (auto& conjuncts : _slot_conjuncts) {
+ for (auto expr : conjuncts.second) {
+ delete expr;
+ expr = nullptr;
+ }
conjuncts.second.clear();
}
+ _row_group_readers.clear();
_slot_conjuncts.clear();
- if (_file_reader != nullptr) {
- _file_reader->close();
- delete _file_reader;
- _file_reader = nullptr;
- }
+ _file_reader->close();
+ delete _file_reader;
}
Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
@@ -55,9 +58,12 @@ Status ParquetReader::init_reader(const TupleDescriptor*
tuple_desc,
std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) {
_file_reader->open();
- _conjunct_ctxs.reset(&conjunct_ctxs);
+ _tuple_desc = tuple_desc;
+ if (_tuple_desc->slots().size() == 0) {
+ return Status::EndOfFile("No Parquet column need load");
+ }
RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
- _t_metadata.reset(&_file_metadata->to_thrift_metadata());
+ _t_metadata = &_file_metadata->to_thrift_metadata();
_total_groups = _file_metadata->num_row_groups();
if (_total_groups == 0) {
return Status::EndOfFile("Empty Parquet File");
@@ -69,8 +75,7 @@ Status ParquetReader::init_reader(const TupleDescriptor*
tuple_desc,
_map_column.emplace(schema_desc.get_column(i)->name, i);
}
RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs));
- RETURN_IF_ERROR(
- _init_row_group_readers(tuple_desc, _range_start_offset,
_range_size, conjunct_ctxs));
+ RETURN_IF_ERROR(_init_row_group_readers(conjunct_ctxs));
return Status::OK();
}
@@ -92,41 +97,46 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
}
ParquetReadColumn column(slot_desc);
_read_columns.emplace_back(column);
- VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
}
return Status::OK();
}
Status ParquetReader::read_next_batch(Block* block, bool* eof) {
- DCHECK(_total_groups == _row_group_readers.size());
- if (_total_groups == 0) {
+ if (_row_group_readers.empty()) {
*eof = true;
+ return Status::OK();
}
+ int32_t num_of_readers = _row_group_readers.size();
+ DCHECK(num_of_readers <= _total_groups);
bool _batch_eof = false;
auto row_group_reader = _row_group_readers[_current_row_group_id];
RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size,
&_batch_eof));
if (_batch_eof) {
- _current_row_group_id++;
- if (_current_row_group_id > _total_groups) {
+ _current_row_group_id = _next_row_group_id();
+ if (_current_row_group_id == -1 || _current_row_group_id >=
num_of_readers) {
*eof = true;
}
}
return Status::OK();
}
-Status ParquetReader::_init_row_group_readers(const TupleDescriptor*
tuple_desc,
- int64_t range_start_offset,
int64_t range_size,
- const std::vector<ExprContext*>&
conjunct_ctxs) {
- std::vector<int32_t> read_row_groups;
- RETURN_IF_ERROR(_filter_row_groups(&read_row_groups));
- _init_conjuncts(tuple_desc, conjunct_ctxs);
- for (auto row_group_id : read_row_groups) {
- VLOG_DEBUG << "_has_page_index";
+int32_t ParquetReader::_next_row_group_id() {
+ if (_read_row_groups.empty()) {
+ return -1;
+ }
+ auto group_id = _read_row_groups.front();
+ _read_row_groups.pop_front();
+ return group_id;
+}
+
+Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>&
conjunct_ctxs) {
+ _init_conjuncts(conjunct_ctxs);
+ RETURN_IF_ERROR(_filter_row_groups());
+ for (auto row_group_id : _read_row_groups) {
auto row_group = _t_metadata->row_groups[row_group_id];
auto column_chunks = row_group.columns;
std::vector<RowRange> skipped_row_ranges;
if (_has_page_index(column_chunks)) {
- VLOG_DEBUG << "_process_page_index";
RETURN_IF_ERROR(_process_page_index(row_group,
skipped_row_ranges));
}
std::shared_ptr<RowGroupReader> row_group_reader;
@@ -136,18 +146,17 @@ Status ParquetReader::_init_row_group_readers(const
TupleDescriptor* tuple_desc,
RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(),
skipped_row_ranges));
_row_group_readers.emplace_back(row_group_reader);
}
- VLOG_DEBUG << "_init_row_group_reader finished";
+ _current_row_group_id = _next_row_group_id();
return Status::OK();
}
-void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
- const std::vector<ExprContext*>&
conjunct_ctxs) {
- if (tuple_desc->slots().empty()) {
+void ParquetReader::_init_conjuncts(const std::vector<ExprContext*>&
conjunct_ctxs) {
+ if (_tuple_desc->slots().empty()) {
return;
}
std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(),
_include_column_ids.end());
- for (int i = 0; i < tuple_desc->slots().size(); i++) {
- auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
+ for (int i = 0; i < _tuple_desc->slots().size(); i++) {
+ auto col_iter = _map_column.find(_tuple_desc->slots()[i]->col_name());
if (col_iter == _map_column.end()) {
continue;
}
@@ -166,7 +175,7 @@ void ParquetReader::_init_conjuncts(const TupleDescriptor*
tuple_desc,
}
SlotRef* slot_ref = (SlotRef*)raw_slot;
SlotId conjunct_slot_id = slot_ref->slot_id();
- if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+ if (conjunct_slot_id == _tuple_desc->slots()[i]->id()) {
// Get conjuncts by conjunct_slot_id
auto iter = _slot_conjuncts.find(conjunct_slot_id);
if (_slot_conjuncts.end() == iter) {
@@ -182,23 +191,25 @@ void ParquetReader::_init_conjuncts(const
TupleDescriptor* tuple_desc,
}
}
-Status ParquetReader::_filter_row_groups(std::vector<int32_t>*
read_row_group_ids) {
+Status ParquetReader::_filter_row_groups() {
if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size <
0) {
return Status::EndOfFile("No row group need read");
}
- int32_t row_group_idx = -1;
+ int32_t row_group_idx = 0;
while (row_group_idx < _total_groups) {
- row_group_idx++;
const tparquet::RowGroup& row_group =
_t_metadata->row_groups[row_group_idx];
if (_is_misaligned_range_group(row_group)) {
+ row_group_idx++;
continue;
}
bool filter_group = false;
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
if (!filter_group) {
- read_row_group_ids->emplace_back(row_group_idx);
+ _read_row_groups.push_back(row_group_idx);
+ row_group_idx++;
break;
}
+ row_group_idx++;
}
return Status::OK();
}
@@ -217,7 +228,7 @@ bool ParquetReader::_is_misaligned_range_group(const
tparquet::RowGroup& row_gro
return false;
}
-bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>
columns) {
+bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk>&
columns) {
_page_index.reset(new PageIndex());
return _page_index->check_and_get_page_index_ranges(columns);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 213ed4dc82..d98825b6f0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <queue>
#include <string>
#include <vector>
@@ -81,14 +82,12 @@ public:
int64_t size() const { return _file_reader->size(); }
private:
+ int32_t _next_row_group_id();
Status _init_read_columns(const std::vector<SlotDescriptor*>&
tuple_slot_descs);
- Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t
range_start_offset,
- int64_t range_size,
- const std::vector<ExprContext*>&
conjunct_ctxs);
- void _init_conjuncts(const TupleDescriptor* tuple_desc,
- const std::vector<ExprContext*>& conjunct_ctxs);
+ Status _init_row_group_readers(const std::vector<ExprContext*>&
conjunct_ctxs);
+ void _init_conjuncts(const std::vector<ExprContext*>& conjunct_ctxs);
// Page Index Filter
- bool _has_page_index(std::vector<tparquet::ColumnChunk> columns);
+ bool _has_page_index(std::vector<tparquet::ColumnChunk>& columns);
Status _process_page_index(tparquet::RowGroup& row_group,
std::vector<RowRange>& skipped_row_ranges);
@@ -101,7 +100,7 @@ private:
Status _process_dict_filter(bool* filter_group);
void _init_bloom_filter();
Status _process_bloom_filter(bool* filter_group);
- Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids);
+ Status _filter_row_groups();
int64_t _get_column_start_offset(const tparquet::ColumnMetaData&
column_init_column_readers);
bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts,
const std::string& encoded_min, const
std::string& encoded_max);
@@ -113,7 +112,7 @@ private:
private:
FileReader* _file_reader;
std::shared_ptr<FileMetaData> _file_metadata;
- std::unique_ptr<tparquet::FileMetaData> _t_metadata;
+ tparquet::FileMetaData* _t_metadata;
std::shared_ptr<PageIndex> _page_index;
std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
@@ -121,11 +120,10 @@ private:
// std::shared_ptr<Statistics> _statistics;
const int32_t _num_of_columns_from_file;
std::map<std::string, int> _map_column; // column-name <---> column-index
- std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs;
std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
std::vector<int> _include_column_ids; // columns that need to get from file
std::vector<ParquetReadColumn> _read_columns;
- bool* _file_eof;
+ std::list<int32_t> _read_row_groups;
// parquet file reader object
size_t _batch_size;
int64_t _range_start_offset;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 63d783f613..895fbbd4ac 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -18,11 +18,12 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-//#include "io/buffered_reader.h"
-//#include "io/file_reader.h"
-//#include "io/local_file_reader.h"
-//#include "util/runtime_profile.h"
-//#include "vec/exec/format/parquet/vparquet_file_metadata.h"
+#include "io/local_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/file_hdfs_scanner.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
namespace doris {
namespace vectorized {
@@ -32,17 +33,207 @@ public:
ParquetReaderTest() {}
};
-//TEST_F(ParquetReaderTest, normal) {
-// LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0);
-// auto st = reader.open();
-// EXPECT_TRUE(st.ok());
-// std::shared_ptr<FileMetaData> metaData;
-// parse_thrift_footer(&reader, metaData);
-// tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
-// for (auto value : t_metadata.row_groups) {
-// LOG(WARNING) << "row group num_rows: " << value.num_rows;
-// }
-//}
+TEST_F(ParquetReaderTest, normal) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::OLAP_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ // init boolean and numeric slot
+ std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col",
"smallint_col",
+ "int_col", "bigint_col",
"float_col",
+ "double_col"};
+ for (int i = 0; i < numeric_types.size(); i++) {
+ TSlotDescriptor tslot_desc;
+ {
+ tslot_desc.id = i;
+ tslot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::type(i + 2));
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ tslot_desc.slotType = type;
+ tslot_desc.columnPos = 0;
+ tslot_desc.byteOffset = 0;
+ tslot_desc.nullIndicatorByte = 0;
+ tslot_desc.nullIndicatorBit = -1;
+ tslot_desc.colName = numeric_types[i];
+ tslot_desc.slotIdx = 0;
+ tslot_desc.isMaterialized = true;
+ t_desc_table.slotDescriptors.push_back(tslot_desc);
+ }
+ }
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = 0;
+ t_tuple_desc.byteSize = 16;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+
+ auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
+ LocalFileReader* reader =
+ new
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
0);
+
+ cctz::time_zone ctz;
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
+ auto p_reader = new ParquetReader(reader, slot_descs.size(), 1024, 0,
1000, &ctz);
+ RuntimeState runtime_state((TQueryGlobals()));
+ runtime_state.set_desc_tbl(desc_tbl);
+ runtime_state.init_instance_mem_tracker();
+
+ auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
+ p_reader->init_reader(tuple_desc, slot_descs, conjunct_ctxs,
runtime_state.timezone());
+ Block* block = new Block();
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ auto data_type =
+
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
true);
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(
+ ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
+ }
+ bool eof = false;
+ p_reader->read_next_batch(block, &eof);
+ for (auto& col : block->get_columns_with_type_and_name()) {
+ ASSERT_EQ(col.column->size(), 10);
+ }
+ EXPECT_TRUE(eof);
+ delete block;
+ delete p_reader;
+}
+
+TEST_F(ParquetReaderTest, scanner) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::OLAP_TABLE;
+ t_table_desc.numCols = 7;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ // init boolean and numeric slot
+ std::vector<std::string> numeric_types = {"boolean_col", "tinyint_col",
"smallint_col",
+ "int_col", "bigint_col",
"float_col",
+ "double_col"};
+ for (int i = 0; i < numeric_types.size(); i++) {
+ TSlotDescriptor tslot_desc;
+ {
+ tslot_desc.id = i;
+ tslot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::type(i + 2));
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ tslot_desc.slotType = type;
+ tslot_desc.columnPos = 0;
+ tslot_desc.byteOffset = 0;
+ tslot_desc.nullIndicatorByte = 1;
+ tslot_desc.nullIndicatorBit = 1;
+ tslot_desc.colName = numeric_types[i];
+ tslot_desc.slotIdx = 0;
+ tslot_desc.isMaterialized = true;
+ t_desc_table.slotDescriptors.push_back(tslot_desc);
+ }
+ }
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = 0;
+ t_tuple_desc.byteSize = 16;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+
+ // set scan range
+ // std::vector<TScanRangeParams> scan_ranges;
+ TFileScanRange file_scan_range;
+ {
+ // TScanRangeParams scan_range_params;
+ // TFileScanRange file_scan_range;
+ TFileScanRangeParams params;
+ {
+ params.__set_src_tuple_id(0);
+ params.__set_num_of_columns_from_file(7);
+ params.file_type = TFileType::FILE_LOCAL;
+ params.format_type = TFileFormatType::FORMAT_PARQUET;
+ std::vector<TFileScanSlotInfo> file_slots;
+ for (int i = 0; i < numeric_types.size(); i++) {
+ TFileScanSlotInfo slot_info;
+ slot_info.slot_id = i;
+ slot_info.is_file_slot = true;
+ file_slots.emplace_back(slot_info);
+ }
+ params.__set_required_slots(file_slots);
+ }
+ file_scan_range.params = params;
+ TFileRangeDesc range;
+ {
+ range.start_offset = 0;
+ range.size = 1000;
+ range.path =
"./be/test/exec/test_data/parquet_scanner/type-decoder.parquet";
+ std::vector<std::string> columns_from_path {"value"};
+ range.__set_columns_from_path(columns_from_path);
+ }
+ file_scan_range.ranges.push_back(range);
+ //
scan_range_params.scan_range.ext_scan_range.__set_file_scan_range(broker_scan_range);
+ // scan_ranges.push_back(scan_range_params);
+ }
+
+ std::vector<TExpr> pre_filter_texprs = std::vector<TExpr>();
+ RuntimeState runtime_state((TQueryGlobals()));
+ runtime_state.init_instance_mem_tracker();
+
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ runtime_state.set_desc_tbl(desc_tbl);
+ ScannerCounter counter;
+ std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
+ auto scan = new ParquetFileHdfsScanner(&runtime_state,
runtime_state.runtime_profile(),
+ file_scan_range.params,
file_scan_range.ranges,
+ pre_filter_texprs, &counter);
+ scan->reg_conjunct_ctxs(0, conjunct_ctxs);
+ Status st = scan->open();
+ EXPECT_TRUE(st.ok());
+
+ bool eof = false;
+ Block* block = new Block();
+ scan->get_next(block, &eof);
+ for (auto& col : block->get_columns_with_type_and_name()) {
+ ASSERT_EQ(col.column->size(), 10);
+ }
+ delete block;
+ delete scan;
+}
} // namespace vectorized
} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]