This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 124b4f7694 [feature-wip](parquet-reader) row group reader ut finish
(#11887)
124b4f7694 is described below
commit 124b4f769466dfb8b4272bacbe40dfd49b0364bf
Author: slothever <[email protected]>
AuthorDate: Thu Aug 18 17:18:14 2022 +0800
[feature-wip](parquet-reader) row group reader ut finish (#11887)
Co-authored-by: jinzhe <[email protected]>
---
.../vec/exec/format/parquet/parquet_thrift_util.h | 2 +-
.../parquet/vparquet_column_chunk_reader.cpp | 5 -
.../exec/format/parquet/vparquet_column_reader.cpp | 31 ++--
.../exec/format/parquet/vparquet_column_reader.h | 12 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 17 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 7 +-
be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 177 +++++++++++++++++++++
8 files changed, 215 insertions(+), 42 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index cb5dc1558b..7852926509 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -34,7 +34,7 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R',
'1'};
constexpr int64_t PARQUET_FOOTER_READ_SIZE = 64 * 1024;
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
-Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>&
file_metadata) {
+static Status parse_thrift_footer(FileReader* file,
std::shared_ptr<FileMetaData>& file_metadata) {
// try with buffer on stack
uint8_t buff[PARQUET_FOOTER_READ_SIZE];
int64_t file_size = file->size();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 751780fbae..0cb4e8229c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -32,19 +32,14 @@ Status ColumnChunkReader::init() {
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
- VLOG_DEBUG << "create _page_reader";
_page_reader = std::make_unique<PageReader>(_stream_reader, start_offset,
chunk_size);
-
if (_metadata.__isset.dictionary_page_offset) {
RETURN_IF_ERROR(_decode_dict_page());
}
// seek to the first data page
_page_reader->seek_to_page(_metadata.data_page_offset);
-
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec,
_block_compress_codec));
-
- VLOG_DEBUG << "initColumnChunkReader finish";
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index e7b189e40c..3daf80e7c8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -37,7 +37,6 @@ Status ParquetColumnReader::create(FileReader* file,
FieldSchema* field,
if (field->type.type == TYPE_ARRAY) {
return Status::Corruption("not supported array type yet");
} else {
- VLOG_DEBUG << "field->physical_column_index: " <<
field->physical_column_index;
tparquet::ColumnChunk chunk =
row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
scalar_reader->init_column_metadata(chunk);
@@ -60,23 +59,27 @@ void ParquetColumnReader::_skipped_pages() {}
Status ScalarColumnReader::init(FileReader* file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges) {
- BufferedFileStreamReader stream_reader(file, _metadata->start_offset(),
_metadata->size());
- _row_ranges.reset(&row_ranges);
- _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
- _chunk_reader->init();
+ _stream_reader =
+ new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size());
+ _row_ranges = &row_ranges;
+ _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field));
+ RETURN_IF_ERROR(_chunk_reader->init());
+ RETURN_IF_ERROR(_chunk_reader->next_page());
+ if (_row_ranges->size() != 0) {
+ _skipped_pages();
+ }
+ RETURN_IF_ERROR(_chunk_reader->load_page_data());
return Status::OK();
}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
size_t batch_size, size_t*
read_rows, bool* eof) {
if (_chunk_reader->remaining_num_values() <= 0) {
- // seek to next page header
- _chunk_reader->next_page();
+ RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges->size() != 0) {
_skipped_pages();
}
- // load data to decoder
- _chunk_reader->load_page_data();
+ RETURN_IF_ERROR(_chunk_reader->load_page_data());
}
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
@@ -84,14 +87,14 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
*read_rows = read_values;
WhichDataType which_type(type);
switch (_metadata->t_metadata().type) {
- case tparquet::Type::INT32: {
+ case tparquet::Type::INT32:
+ case tparquet::Type::INT64:
+ case tparquet::Type::FLOAT:
+ case tparquet::Type::DOUBLE:
+ case tparquet::Type::BOOLEAN: {
_chunk_reader->decode_values(doris_column, type, read_values);
return Status::OK();
}
- case tparquet::Type::INT64: {
- // todo: test int64
- return Status::OK();
- }
default:
return Status::Corruption("unsupported parquet data type");
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 696fbe5db0..6c6a0e4013 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -50,7 +50,12 @@ private:
class ParquetColumnReader {
public:
ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
- virtual ~ParquetColumnReader() = default;
+ virtual ~ParquetColumnReader() {
+ if (_stream_reader != nullptr) {
+ delete _stream_reader;
+ _stream_reader = nullptr;
+ }
+ };
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr&
type, size_t batch_size,
size_t* read_rows, bool* eof) = 0;
static Status create(FileReader* file, FieldSchema* field, const
ParquetReadColumn& column,
@@ -64,14 +69,15 @@ protected:
protected:
const ParquetReadColumn& _column;
+ BufferedFileStreamReader* _stream_reader;
std::unique_ptr<ParquetColumnMetadata> _metadata;
- std::unique_ptr<std::vector<RowRange>> _row_ranges;
+ std::vector<RowRange>* _row_ranges;
};
class ScalarColumnReader : public ParquetColumnReader {
public:
ScalarColumnReader(const ParquetReadColumn& column) :
ParquetColumnReader(column) {};
- ~ScalarColumnReader() override = default;
+ ~ScalarColumnReader() override { close(); };
Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk*
chunk,
std::vector<RowRange>& row_ranges);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t
batch_size,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 751e43863a..0ac58ce6ed 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -45,10 +45,9 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
std::vector<RowRange>
Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
std::vector<RowRange>& row_ranges)
{
for (auto& read_col : _read_columns) {
- SlotDescriptor* slot_desc = read_col.slot_desc;
+ SlotDescriptor* slot_desc = read_col._slot_desc;
TypeDescriptor col_type = slot_desc->type();
auto field =
const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
- VLOG_DEBUG << "field: " << field->debug_string();
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field,
read_col, _row_group_meta,
row_ranges, reader));
@@ -62,20 +61,18 @@ Status RowGroupReader::_init_column_readers(const
FieldDescriptor& schema,
}
Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool*
_batch_eof) {
- if (_read_rows >= _total_rows) {
- *_batch_eof = true;
- }
for (auto& read_col : _read_columns) {
- auto slot_desc = read_col.slot_desc;
+ auto slot_desc = read_col._slot_desc;
auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
- auto column_ptr = column_with_type_and_name.column;
- auto column_type = column_with_type_and_name.type;
+ auto& column_ptr = column_with_type_and_name.column;
+ auto& column_type = column_with_type_and_name.type;
size_t batch_read_rows = 0;
RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
column_ptr, column_type, batch_size, &batch_read_rows,
_batch_eof));
_read_rows += batch_read_rows;
- VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
- VLOG_DEBUG << "read rows in column: " << batch_read_rows;
+ if (_read_rows >= _total_rows) {
+ *_batch_eof = true;
+ }
}
// use data fill utils read column data to column ptr
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index b16df6b557..13fb9e8cad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -62,7 +62,6 @@ Status ParquetReader::init_reader(const TupleDescriptor*
tuple_desc,
}
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < _file_metadata->num_columns(); ++i) {
- // for test
VLOG_DEBUG << schema_desc.debug_string();
// Get the Column Reader for the boolean column
_map_column.emplace(schema_desc.get_column(i)->name, i);
@@ -89,11 +88,7 @@ Status ParquetReader::_init_read_columns(const
std::vector<SlotDescriptor*>& tup
VLOG_DEBUG << str_error.str();
return Status::InvalidArgument(str_error.str());
}
- ParquetReadColumn column;
- column.slot_desc = slot_desc;
- column.parquet_column_id = parquet_col_id;
- auto physical_type =
_file_metadata->schema().get_column(parquet_col_id)->physical_type;
- column.parquet_type = physical_type;
+ ParquetReadColumn column(slot_desc);
_read_columns.emplace_back(column);
VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c1d0ec4247..a979daf692 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -53,11 +53,11 @@ class ParquetReadColumn {
public:
friend class ParquetReader;
friend class RowGroupReader;
+ ParquetReadColumn(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) {};
+ ~ParquetReadColumn() = default;
private:
- SlotDescriptor* slot_desc;
- int parquet_column_id;
- tparquet::Type::type parquet_type;
+ SlotDescriptor* _slot_desc;
// int64_t start_offset;
// int64_t chunk_size;
};
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c334b105ed..7aa0f8cbd3 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -33,6 +33,7 @@
#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/format/parquet/parquet_thrift_util.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/parquet/vparquet_column_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
namespace doris {
@@ -353,6 +354,182 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
}
}
+TEST_F(ParquetThriftReaderTest, column_reader) {
+ LocalFileReader
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+ auto st = file_reader.open();
+ EXPECT_TRUE(st.ok());
+
+ // prepare metadata
+ std::shared_ptr<FileMetaData> meta_data;
+ parse_thrift_footer(&file_reader, meta_data);
+ tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
+
+ FieldDescriptor schema_descriptor;
+ // todo use schema of meta_data
+ schema_descriptor.parse_from_thrift(t_metadata.schema);
+ // create scalar column reader
+ std::unique_ptr<ParquetColumnReader> reader;
+ auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0));
+ // create read model
+ TDescriptorTable t_desc_table;
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::OLAP_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+ TSlotDescriptor tslot_desc;
+ {
+ tslot_desc.id = 0;
+ tslot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::TINYINT);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ tslot_desc.slotType = type;
+ tslot_desc.columnPos = 0;
+ tslot_desc.byteOffset = 0;
+ tslot_desc.nullIndicatorByte = 0;
+ tslot_desc.nullIndicatorBit = -1;
+ tslot_desc.colName = "tinyint_col";
+ tslot_desc.slotIdx = 0;
+ tslot_desc.isMaterialized = true;
+ t_desc_table.slotDescriptors.push_back(tslot_desc);
+ }
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = 0;
+ t_tuple_desc.byteSize = 16;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ auto slot_desc = desc_tbl->get_slot_descriptor(0);
+ ParquetReadColumn column(slot_desc);
+ std::vector<RowRange> row_ranges = std::vector<RowRange>();
+ ParquetColumnReader::create(&file_reader, field, column,
t_metadata.row_groups[0], row_ranges,
+ reader);
+ std::unique_ptr<vectorized::Block> block;
+ create_block(block);
+ auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
+ auto& column_ptr = column_with_type_and_name.column;
+ auto& column_type = column_with_type_and_name.type;
+ size_t batch_read_rows = 0;
+ bool batch_eof = false;
+ ASSERT_EQ(column_ptr->size(), 0);
+
+ reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows,
&batch_eof);
+ EXPECT_TRUE(!batch_eof);
+ ASSERT_EQ(batch_read_rows, 10);
+ ASSERT_EQ(column_ptr->size(), 10);
+
+ auto* nullable_column =
+
reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get());
+ MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+ int int_sum = 0;
+ for (int i = 0; i < column_ptr->size(); i++) {
+ int_sum += (int8_t)column_ptr->get64(i);
+ }
+ ASSERT_EQ(int_sum, 5);
+}
+
+TEST_F(ParquetThriftReaderTest, group_reader) {
+ TDescriptorTable t_desc_table;
+ TTableDescriptor t_table_desc;
+ std::vector<std::string> int_types = {"boolean_col", "tinyint_col",
"smallint_col", "int_col",
+ "bigint_col", "float_col",
"double_col"};
+ // "string_col"
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::OLAP_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ for (int i = 0; i < int_types.size(); i++) {
+ TSlotDescriptor tslot_desc;
+ {
+ tslot_desc.id = i;
+ tslot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::type(i + 2));
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ tslot_desc.slotType = type;
+ tslot_desc.columnPos = 0;
+ tslot_desc.byteOffset = 0;
+ tslot_desc.nullIndicatorByte = 0;
+ tslot_desc.nullIndicatorBit = -1;
+ tslot_desc.colName = int_types[i];
+ tslot_desc.slotIdx = 0;
+ tslot_desc.isMaterialized = true;
+ t_desc_table.slotDescriptors.push_back(tslot_desc);
+ }
+ }
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = 0;
+ t_tuple_desc.byteSize = 16;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+ DescriptorTbl* desc_tbl;
+ ObjectPool obj_pool;
+ DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ std::vector<ParquetReadColumn> read_columns;
+ for (int i = 0; i < int_types.size(); i++) {
+ auto slot_desc = desc_tbl->get_slot_descriptor(i);
+ ParquetReadColumn column(slot_desc);
+ read_columns.emplace_back(column);
+ }
+
+ LocalFileReader
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+ auto st = file_reader.open();
+ EXPECT_TRUE(st.ok());
+
+ // prepare metadata
+ std::shared_ptr<FileMetaData> meta_data;
+ parse_thrift_footer(&file_reader, meta_data);
+ tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
+
+ auto row_group = t_metadata.row_groups[0];
+ std::shared_ptr<RowGroupReader> row_group_reader;
+ row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0,
row_group));
+ std::vector<RowRange> row_ranges = std::vector<RowRange>();
+ auto stg = row_group_reader->init(meta_data->schema(), row_ranges);
+ EXPECT_TRUE(stg.ok());
+
+ std::unique_ptr<vectorized::Block> block;
+ create_block(block);
+ bool batch_eof = false;
+ auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof);
+ EXPECT_TRUE(stb.ok());
+ LOG(WARNING) << "block data: " << block->dump_structure();
+}
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]