This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dec576a991 [feature-wip](parquet-reader) generate null values and
NullMap for parquet column (#12115)
dec576a991 is described below
commit dec576a9918436ec8e80e899651e0f7db879885a
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Aug 29 09:30:32 2022 +0800
[feature-wip](parquet-reader) generate null values and NullMap for parquet
column (#12115)
Generate null values and NullMap for the nullable column by analyzing the
definition levels.
---
be/src/common/config.h | 1 +
be/src/exec/schema_scanner.cpp | 1 +
be/src/vec/exec/file_scan_node.cpp | 11 +-
be/src/vec/exec/format/parquet/parquet_common.cpp | 5 +-
.../format/parquet/vparquet_column_chunk_reader.h | 7 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 62 ++++++-
.../exec/format/parquet/vparquet_group_reader.cpp | 4 -
.../exec/format/parquet/vparquet_group_reader.h | 1 -
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 188 ++++-----------------
9 files changed, 102 insertions(+), 178 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a293dc64a4..fa84f5e11a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -805,6 +805,7 @@ CONF_Int32(object_pool_buffer_size, "100");
CONF_Int32(parquet_reader_max_buffer_size, "50");
CONF_Bool(parquet_predicate_push_down, "true");
CONF_Int32(parquet_header_max_size, "8388608");
+CONF_Bool(parquet_reader_using_internal, "false");
// When the rows number reached this limit, will check the filter rate the of
bloomfilter
// if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 9071293ff1..fb4623114c 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -162,6 +162,7 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
t_slot_desc.__set_nullIndicatorBit(-1);
}
+ t_slot_desc.id = i;
t_slot_desc.__set_slotIdx(i);
t_slot_desc.__set_isMaterialized(true);
diff --git a/be/src/vec/exec/file_scan_node.cpp
b/be/src/vec/exec/file_scan_node.cpp
index e84c0e9371..8dc3cc5222 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -466,10 +466,13 @@ std::unique_ptr<FileScanner>
FileScanNode::create_scanner(const TFileScanRange&
FileScanner* scan = nullptr;
switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
- scan = new VFileParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
- scan_range.ranges, _pre_filter_texprs,
counter);
- // scan = new ParquetFileHdfsScanner(_runtime_state,
runtime_profile(), scan_range.params,
- // scan_range.ranges,
_pre_filter_texprs, counter);
+ if (config::parquet_reader_using_internal) {
+ scan = new ParquetFileHdfsScanner(_runtime_state,
runtime_profile(), scan_range.params,
+ scan_range.ranges,
_pre_filter_texprs, counter);
+ } else {
+ scan = new VFileParquetScanner(_runtime_state, runtime_profile(),
scan_range.params,
+ scan_range.ranges,
_pre_filter_texprs, counter);
+ }
break;
case TFileFormatType::FORMAT_ORC:
scan = new VFileORCScanner(_runtime_state, runtime_profile(),
scan_range.params,
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 347db41d86..26c1ab7735 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -258,8 +258,7 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypeP
}
return Status::InvalidArgument("Can't decode parquet physical type {} to
doris logical type {}",
- tparquet::to_string(_physical_type),
- getTypeName(data_type->get_type_id()));
+ tparquet::to_string(_physical_type),
getTypeName(logical_type));
}
Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t
dict_size) {
@@ -351,7 +350,7 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypeP
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical
type {}",
- getTypeName(data_type->get_type_id()));
+ getTypeName(logical_type));
}
Status BoolPlainDecoder::skip_values(size_t num_values) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 79fdc204dc..4319ba6689 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -105,6 +105,9 @@ public:
// Get the definition level decoder of current page.
LevelDecoder& def_level_decoder() { return _def_level_decoder; }
+ level_t max_rep_level() const { return _max_rep_level; }
+ level_t max_def_level() const { return _max_def_level; }
+
// Get page decoder
Decoder* get_page_decoder() { return _page_decoder; }
@@ -119,9 +122,7 @@ private:
tparquet::LogicalType _parquet_logical_type;
BufferedStreamReader* _stream_reader;
- // tparquet::ColumnChunk* _column_chunk;
- tparquet::ColumnMetaData& _metadata;
- // FieldSchema* _field_schema;
+ tparquet::ColumnMetaData _metadata;
cctz::time_zone* _ctz;
std::unique_ptr<PageReader> _page_reader = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 66d9793ab5..9cdf60f378 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -63,18 +63,17 @@ Status ScalarColumnReader::init(FileReader* file,
FieldSchema* field, tparquet::
new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size());
_row_ranges = &row_ranges;
_chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field,
_ctz));
- RETURN_IF_ERROR(_chunk_reader->init());
- RETURN_IF_ERROR(_chunk_reader->next_page());
- if (_row_ranges->size() != 0) {
- _skipped_pages();
- }
- RETURN_IF_ERROR(_chunk_reader->load_page_data());
- return Status::OK();
+ return _chunk_reader->init();
}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column,
DataTypePtr& type,
size_t batch_size, size_t*
read_rows, bool* eof) {
- if (_chunk_reader->remaining_num_values() <= 0) {
+ if (_chunk_reader->remaining_num_values() == 0) {
+ if (!_chunk_reader->has_next_page()) {
+ *eof = true;
+ *read_rows = 0;
+ return Status::OK();
+ }
RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges->size() != 0) {
_skipped_pages();
@@ -84,8 +83,53 @@ Status ScalarColumnReader::read_column_data(ColumnPtr&
doris_column, DataTypePtr
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
: batch_size;
- RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type,
read_values));
+ // get definition levels, and generate null values
+ level_t definitions[read_values];
+ if (_chunk_reader->max_def_level() == 0) { // required field
+ std::fill(definitions, definitions + read_values, 1);
+ } else {
+ _chunk_reader->get_def_levels(definitions, read_values);
+ }
+ // fill NullMap
+ CHECK(doris_column->is_nullable());
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(doris_column)).mutate().get());
+ NullMap& map_data = nullable_column->get_null_map_data();
+ for (int i = 0; i < read_values; ++i) {
+ map_data.emplace_back(definitions[i] == 0);
+ }
+ // decode data
+ if (_chunk_reader->max_def_level() == 0) {
+ RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type,
read_values));
+ } else if (read_values > 0) {
+ // column with null values
+ level_t level_type = definitions[0];
+ int num_values = 1;
+ for (int i = 1; i < read_values; ++i) {
+ if (definitions[i] != level_type) {
+ if (level_type == 0) {
+ // null values
+ _chunk_reader->insert_null_values(doris_column,
num_values);
+ } else {
+ RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column,
type, num_values));
+ }
+ level_type = definitions[i];
+ num_values = 1;
+ } else {
+ num_values++;
+ }
+ }
+ if (level_type == 0) {
+ // null values
+ _chunk_reader->insert_null_values(doris_column, num_values);
+ } else {
+ RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type,
num_values));
+ }
+ }
*read_rows = read_values;
+ if (_chunk_reader->remaining_num_values() == 0 &&
!_chunk_reader->has_next_page()) {
+ *eof = true;
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7443434cfb..0f6b0d084b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -31,7 +31,6 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
_read_columns(read_columns),
_row_group_id(row_group_id),
_row_group_meta(row_group),
- _total_rows(row_group.num_rows),
_ctz(ctz) {}
RowGroupReader::~RowGroupReader() {
@@ -72,9 +71,6 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, bool* _batch_
RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
column_ptr, column_type, batch_size, &batch_read_rows,
_batch_eof));
_read_rows += batch_read_rows;
- if (_read_rows >= _total_rows) {
- *_batch_eof = true;
- }
}
// use data fill utils read column data to column ptr
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 5ed99cd4e3..57c72d4863 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -48,7 +48,6 @@ private:
const int32_t _row_group_id;
tparquet::RowGroup& _row_group_meta;
int64_t _read_rows = 0;
- int64_t _total_rows;
cctz::time_zone* _ctz;
};
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 75cc087d12..3bcc125178 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -329,158 +329,31 @@ TEST_F(ParquetThriftReaderTest, dict_decoder) {
"./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12);
}
-TEST_F(ParquetThriftReaderTest, column_reader) {
- LocalFileReader
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
- auto st = file_reader.open();
- EXPECT_TRUE(st.ok());
-
- // prepare metadata
- std::shared_ptr<FileMetaData> meta_data;
- parse_thrift_footer(&file_reader, meta_data);
- tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
-
- FieldDescriptor schema_descriptor;
- // todo use schema of meta_data
- schema_descriptor.parse_from_thrift(t_metadata.schema);
- // create scalar column reader
- std::unique_ptr<ParquetColumnReader> reader;
- auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0));
- // create read model
- TDescriptorTable t_desc_table;
- // table descriptors
- TTableDescriptor t_table_desc;
- cctz::time_zone ctz;
- TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
-
- t_table_desc.id = 0;
- t_table_desc.tableType = TTableType::OLAP_TABLE;
- t_table_desc.numCols = 0;
- t_table_desc.numClusteringCols = 0;
- t_desc_table.tableDescriptors.push_back(t_table_desc);
- t_desc_table.__isset.tableDescriptors = true;
- TSlotDescriptor tslot_desc;
- {
- tslot_desc.id = 0;
- tslot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::TINYINT);
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- tslot_desc.slotType = type;
- tslot_desc.columnPos = 0;
- tslot_desc.byteOffset = 0;
- tslot_desc.nullIndicatorByte = 0;
- tslot_desc.nullIndicatorBit = -1;
- tslot_desc.colName = "tinyint_col";
- tslot_desc.slotIdx = 0;
- tslot_desc.isMaterialized = true;
- t_desc_table.slotDescriptors.push_back(tslot_desc);
- }
- t_desc_table.__isset.slotDescriptors = true;
- {
- // TTupleDescriptor dest
- TTupleDescriptor t_tuple_desc;
- t_tuple_desc.id = 0;
- t_tuple_desc.byteSize = 16;
- t_tuple_desc.numNullBytes = 0;
- t_tuple_desc.tableId = 0;
- t_tuple_desc.__isset.tableId = true;
- t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
- }
- DescriptorTbl* desc_tbl;
- ObjectPool obj_pool;
- DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
- auto slot_desc = desc_tbl->get_slot_descriptor(0);
- ParquetReadColumn column(slot_desc);
- std::vector<RowRange> row_ranges = std::vector<RowRange>();
- ParquetColumnReader::create(&file_reader, field, column,
t_metadata.row_groups[0], row_ranges,
- &ctz, reader);
- std::unique_ptr<vectorized::Block> block;
- create_block(block);
- auto& column_with_type_and_name =
block->get_by_name(slot_desc->col_name());
- auto& column_ptr = column_with_type_and_name.column;
- auto& column_type = column_with_type_and_name.type;
- size_t batch_read_rows = 0;
- bool batch_eof = false;
- ASSERT_EQ(column_ptr->size(), 0);
-
- reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows,
&batch_eof);
- EXPECT_TRUE(!batch_eof);
- ASSERT_EQ(batch_read_rows, 10);
- ASSERT_EQ(column_ptr->size(), 10);
-
- auto* nullable_column =
-
reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get());
- MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
- int int_sum = 0;
- for (int i = 0; i < column_ptr->size(); i++) {
- int_sum += (int8_t)column_ptr->get64(i);
- }
- ASSERT_EQ(int_sum, 5);
-}
-
TEST_F(ParquetThriftReaderTest, group_reader) {
- TDescriptorTable t_desc_table;
- TTableDescriptor t_table_desc;
- std::vector<std::string> int_types = {"boolean_col", "tinyint_col",
"smallint_col", "int_col",
- "bigint_col", "float_col",
"double_col"};
- // "string_col"
- t_table_desc.id = 0;
- t_table_desc.tableType = TTableType::OLAP_TABLE;
- t_table_desc.numCols = 0;
- t_table_desc.numClusteringCols = 0;
- t_desc_table.tableDescriptors.push_back(t_table_desc);
- t_desc_table.__isset.tableDescriptors = true;
-
- for (int i = 0; i < int_types.size(); i++) {
- TSlotDescriptor tslot_desc;
- {
- tslot_desc.id = i;
- tslot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::type(i + 2));
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- tslot_desc.slotType = type;
- tslot_desc.columnPos = 0;
- tslot_desc.byteOffset = 0;
- tslot_desc.nullIndicatorByte = 0;
- tslot_desc.nullIndicatorBit = -1;
- tslot_desc.colName = int_types[i];
- tslot_desc.slotIdx = 0;
- tslot_desc.isMaterialized = true;
- t_desc_table.slotDescriptors.push_back(tslot_desc);
- }
- }
-
- t_desc_table.__isset.slotDescriptors = true;
- {
- // TTupleDescriptor dest
- TTupleDescriptor t_tuple_desc;
- t_tuple_desc.id = 0;
- t_tuple_desc.byteSize = 16;
- t_tuple_desc.numNullBytes = 0;
- t_tuple_desc.tableId = 0;
- t_tuple_desc.__isset.tableId = true;
- t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
- }
- DescriptorTbl* desc_tbl;
- ObjectPool obj_pool;
- DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+ SchemaScanner::ColumnDesc column_descs[] = {
+ {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true},
+ {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true},
+ {"int_col", TYPE_INT, sizeof(int32_t), true},
+ {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true},
+ {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true},
+ {"float_col", TYPE_FLOAT, sizeof(float_t), true},
+ {"double_col", TYPE_DOUBLE, sizeof(double_t), true},
+ {"string_col", TYPE_STRING, sizeof(StringValue), true},
+ {"binary_col", TYPE_STRING, sizeof(StringValue), true},
+ {"timestamp_col", TYPE_DATETIME, sizeof(DateTimeValue), true},
+ {"decimal_col", TYPE_DECIMALV2, sizeof(DecimalV2Value), true},
+ {"char_col", TYPE_CHAR, sizeof(StringValue), true},
+ {"varchar_col", TYPE_VARCHAR, sizeof(StringValue), true},
+ {"date_col", TYPE_DATE, sizeof(DateTimeValue), true}};
+ int num_cols = sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc);
+ SchemaScanner schema_scanner(column_descs, num_cols);
+ ObjectPool object_pool;
+ SchemaScannerParam param;
+ schema_scanner.init(¶m, &object_pool);
+ auto tuple_slots =
const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots();
std::vector<ParquetReadColumn> read_columns;
- for (int i = 0; i < int_types.size(); i++) {
- auto slot_desc = desc_tbl->get_slot_descriptor(i);
- ParquetReadColumn column(slot_desc);
+ for (const auto& slot : tuple_slots) {
+ ParquetReadColumn column(slot);
read_columns.emplace_back(column);
}
@@ -502,12 +375,19 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
auto stg = row_group_reader->init(meta_data->schema(), row_ranges);
EXPECT_TRUE(stg.ok());
- std::unique_ptr<vectorized::Block> block;
- create_block(block);
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_slots) {
+ auto is_nullable = slot_desc->is_nullable();
+ auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
+
is_nullable);
+ MutableColumnPtr data_column = data_type->create_column();
+ block.insert(
+ ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
+ }
bool batch_eof = false;
- auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof);
+ auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof);
EXPECT_TRUE(stb.ok());
- LOG(WARNING) << "block data: " << block->dump_structure();
+ LOG(WARNING) << "block data: " << block.dump_data(0, 10);
}
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]