This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b9bfd15b7 [feature-wip](parquet-reader) parquet physical type to
doris logical type (#11769)
0b9bfd15b7 is described below
commit 0b9bfd15b7ca45bcf5abe17accfcd11333a1229a
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Aug 15 16:08:11 2022 +0800
[feature-wip](parquet-reader) parquet physical type to doris logical type
(#11769)
Two improvements have been added:
1. Translate parquet physical type into doris logical type.
2. Decode parquet column chunk into doris ColumnPtr, and add unit tests to
show how to use related API.
---
be/src/vec/exec/format/parquet/parquet_common.cpp | 100 +++++++------
be/src/vec/exec/format/parquet/parquet_common.h | 81 ++++-------
.../parquet/vparquet_column_chunk_reader.cpp | 49 +++++--
.../format/parquet/vparquet_column_chunk_reader.h | 17 ++-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 158 ++++++++++++++++-----
5 files changed, 255 insertions(+), 150 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 1d80676b9b..082d0fc57d 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -18,34 +18,36 @@
#include "parquet_common.h"
#include "util/coding.h"
+#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
-Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type
encoding,
- std::unique_ptr<Decoder>& decoder) {
+#define FOR_LOGICAL_NUMERIC_TYPES(M) \
+ M(TypeIndex::Int32, Int32) \
+ M(TypeIndex::UInt32, UInt32) \
+ M(TypeIndex::Int64, Int64) \
+ M(TypeIndex::UInt64, UInt64) \
+ M(TypeIndex::Float32, Float32) \
+ M(TypeIndex::Float64, Float64)
+
+Status Decoder::get_decoder(tparquet::Type::type type,
tparquet::Encoding::type encoding,
+ std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
switch (type) {
case tparquet::Type::BOOLEAN:
decoder.reset(new BoolPlainDecoder());
break;
- case tparquet::Type::INT32:
- decoder.reset(new PlainDecoder<Int32>());
+ case tparquet::Type::BYTE_ARRAY:
+ decoder.reset(new ByteArrayPlainDecoder());
break;
+ case tparquet::Type::INT32:
case tparquet::Type::INT64:
- decoder.reset(new PlainDecoder<Int64>());
- break;
+ case tparquet::Type::INT96:
case tparquet::Type::FLOAT:
- decoder.reset(new PlainDecoder<Float32>());
- break;
case tparquet::Type::DOUBLE:
- decoder.reset(new PlainDecoder<Float64>());
- break;
- case tparquet::Type::BYTE_ARRAY:
- decoder.reset(new BAPlainDecoder());
- break;
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
- decoder.reset(new FixedLengthBAPlainDecoder());
+ decoder.reset(new PlainDecoder(type));
break;
default:
return Status::InternalError("Unsupported plain type {} in parquet
decoder",
@@ -60,34 +62,28 @@ Status Decoder::getDecoder(tparquet::Type::type type,
tparquet::Encoding::type e
return Status::OK();
}
-Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) {
+Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
- return _decode_values(data_column, num_values);
+ return decode_values(data_column, data_type, num_values);
}
-Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t
num_values) {
+Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
- // insert '\0' into the end of each binary
- if (UNLIKELY(to_read_bytes + num_values > slice.size)) {
+ if (UNLIKELY(to_read_bytes > slice.size)) {
return Status::IOError("Slice does not have enough space to write out
the decoding data");
}
- uint32_t slice_offset = 0;
- for (int i = 0; i < num_values; ++i) {
- memcpy(slice.data + slice_offset, _data->data + _offset, _type_length);
- slice_offset += _type_length + 1;
- slice.data[slice_offset - 1] = '\0';
- _offset += _type_length;
- }
+ memcpy(slice.data, _data->data + _offset, to_read_bytes);
+ _offset += to_read_bytes;
return Status::OK();
}
-Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) {
+Status PlainDecoder::skip_values(size_t num_values) {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
@@ -95,23 +91,43 @@ Status FixedLengthBAPlainDecoder::skip_values(size_t
num_values) {
return Status::OK();
}
-Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr&
doris_column,
- size_t num_values) {
+Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t
num_values,
+ size_t real_length) {
if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
- auto& column_chars_t =
assert_cast<ColumnString&>(*doris_column).get_chars();
- auto& column_offsets =
assert_cast<ColumnString&>(*doris_column).get_offsets();
for (int i = 0; i < num_values; ++i) {
- column_chars_t.insert(_data->data + _offset, _data->data + _offset +
_type_length);
- column_chars_t.emplace_back('\0');
- column_offsets.emplace_back(column_chars_t.size());
+ doris_column->insert_data(_data->data + _offset, real_length);
_offset += _type_length;
}
return Status::OK();
}
-Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
+Status PlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
+ TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
+ switch (logical_type) {
+ case TypeIndex::Int8:
+ case TypeIndex::UInt8:
+ return _decode_short_int(doris_column, num_values, 1);
+ case TypeIndex::Int16:
+ case TypeIndex::UInt16:
+ return _decode_short_int(doris_column, num_values, 2);
+#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
+ case NUMERIC_TYPE: \
+ return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, num_values);
+ FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+ default:
+ break;
+ }
+
+ return Status::InvalidArgument("Can't decode parquet physical type {} to
doris logical type {}",
+ tparquet::to_string(_physical_type),
+ getTypeName(data_type->get_type_id()));
+}
+
+Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
uint32_t slice_offset = 0;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
@@ -131,7 +147,7 @@ Status BAPlainDecoder::decode_values(Slice& slice, size_t
num_values) {
return Status::OK();
}
-Status BAPlainDecoder::skip_values(size_t num_values) {
+Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain
decoder");
@@ -147,9 +163,8 @@ Status BAPlainDecoder::skip_values(size_t num_values) {
return Status::OK();
}
-Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t
num_values) {
- auto& column_chars_t =
assert_cast<ColumnString&>(*doris_column).get_chars();
- auto& column_offsets =
assert_cast<ColumnString&>(*doris_column).get_offsets();
+Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain
decoder");
@@ -160,9 +175,7 @@ Status BAPlainDecoder::_decode_values(MutableColumnPtr&
doris_column, size_t num
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
- column_chars_t.insert(_data->data + _offset, _data->data + _offset +
length);
- column_chars_t.emplace_back('\0');
- column_offsets.emplace_back(column_chars_t.size());
+ doris_column->insert_data(_data->data + _offset, length);
_offset += length;
}
return Status::OK();
@@ -203,7 +216,8 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
return Status::OK();
}
-Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t
num_values) {
+Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
auto& column_data =
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
bool value;
for (int i = 0; i < num_values; ++i) {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h
b/be/src/vec/exec/format/parquet/parquet_common.h
index 44523ae22d..f0aed43288 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -25,6 +25,7 @@
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
+#include "vec/data_types/data_type.h"
namespace doris::vectorized {
@@ -35,8 +36,8 @@ public:
Decoder() = default;
virtual ~Decoder() = default;
- static Status getDecoder(tparquet::Type::type type,
tparquet::Encoding::type encoding,
- std::unique_ptr<Decoder>& decoder);
+ static Status get_decoder(tparquet::Type::type type,
tparquet::Encoding::type encoding,
+ std::unique_ptr<Decoder>& decoder);
// The type with fix length
void set_type_length(int32_t type_length) { _type_length = type_length; }
@@ -48,88 +49,63 @@ public:
}
// Write the decoded values batch to doris's column
- Status decode_values(ColumnPtr& doris_column, size_t num_values);
+ Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
+
+ virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
+ size_t num_values) = 0;
virtual Status decode_values(Slice& slice, size_t num_values) = 0;
virtual Status skip_values(size_t num_values) = 0;
protected:
- virtual Status _decode_values(MutableColumnPtr& doris_column, size_t
num_values) = 0;
-
int32_t _type_length;
Slice* _data = nullptr;
uint32_t _offset = 0;
};
-template <typename T>
class PlainDecoder final : public Decoder {
public:
- PlainDecoder() = default;
+ PlainDecoder(tparquet::Type::type physical_type) :
_physical_type(physical_type) {};
~PlainDecoder() override = default;
- Status decode_values(Slice& slice, size_t num_values) override {
- size_t to_read_bytes = TYPE_LENGTH * num_values;
- if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
- return Status::IOError("Out-of-bounds access in parquet data
decoder");
- }
- if (UNLIKELY(to_read_bytes > slice.size)) {
- return Status::IOError(
- "Slice does not have enough space to write out the
decoding data");
- }
- memcpy(slice.data, _data->data + _offset, to_read_bytes);
- _offset += to_read_bytes;
- return Status::OK();
- }
+ Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
+ size_t num_values) override;
- Status skip_values(size_t num_values) override {
- _offset += TYPE_LENGTH * num_values;
- if (UNLIKELY(_offset > _data->size)) {
- return Status::IOError("Out-of-bounds access in parquet data
decoder");
- }
- return Status::OK();
- }
+ Status decode_values(Slice& slice, size_t num_values) override;
+
+ Status skip_values(size_t num_values) override;
protected:
- enum { TYPE_LENGTH = sizeof(T) };
+ Status _decode_short_int(MutableColumnPtr& doris_column, size_t
num_values, size_t real_length);
- Status _decode_values(MutableColumnPtr& doris_column, size_t num_values)
override {
- size_t to_read_bytes = TYPE_LENGTH * num_values;
+ template <typename Numeric>
+ Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
+ size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data
decoder");
}
- auto& column_data =
static_cast<ColumnVector<T>&>(*doris_column).get_data();
- const auto* raw_data = reinterpret_cast<const T*>(_data->data +
_offset);
+ auto& column_data =
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
+ const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data +
_offset);
column_data.insert(raw_data, raw_data + num_values);
_offset += to_read_bytes;
return Status::OK();
}
-};
-
-class FixedLengthBAPlainDecoder final : public Decoder {
-public:
- FixedLengthBAPlainDecoder() = default;
- ~FixedLengthBAPlainDecoder() override = default;
-
- Status decode_values(Slice& slice, size_t num_values) override;
-
- Status skip_values(size_t num_values) override;
-protected:
- Status _decode_values(MutableColumnPtr& doris_column, size_t num_values)
override;
+ tparquet::Type::type _physical_type;
};
-class BAPlainDecoder final : public Decoder {
+class ByteArrayPlainDecoder final : public Decoder {
public:
- BAPlainDecoder() = default;
- ~BAPlainDecoder() override = default;
+ ByteArrayPlainDecoder() = default;
+ ~ByteArrayPlainDecoder() override = default;
+
+ Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
+ size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
-
-protected:
- Status _decode_values(MutableColumnPtr& doris_column, size_t num_values)
override;
};
/// Decoder bit-packed boolean-encoded values.
@@ -147,6 +123,9 @@ public:
_offset = 0;
}
+ Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
+ size_t num_values) override;
+
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
@@ -167,8 +146,6 @@ protected:
return true;
}
- Status _decode_values(MutableColumnPtr& doris_column, size_t num_values)
override;
-
/// A buffer to store unpacked values. Must be a multiple of 32 size to
use the
/// batch-oriented interface of BatchedBitReader. We use uint8_t instead
of bool because
/// bit unpacking is only supported for unsigned integers. The values are
converted to
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index d4c7f534a3..a0a21b00ca 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -20,13 +20,14 @@
namespace doris::vectorized {
ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader,
- tparquet::ColumnChunk* column_chunk,
FieldSchema* fieldSchema)
- : _max_rep_level(fieldSchema->repetition_level),
- _max_def_level(fieldSchema->definition_level),
+ tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema)
+ : _field_schema(field_schema),
+ _max_rep_level(field_schema->repetition_level),
+ _max_def_level(field_schema->definition_level),
_stream_reader(reader),
_metadata(column_chunk->meta_data) {}
-Status ColumnChunkReader::init(size_t type_length) {
+Status ColumnChunkReader::init() {
size_t start_offset = _metadata.__isset.dictionary_page_offset
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
@@ -41,8 +42,6 @@ Status ColumnChunkReader::init(size_t type_length) {
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec,
_block_compress_codec));
- // -1 means unfixed length type
- _type_length = type_length;
return Status::OK();
}
@@ -91,14 +90,13 @@ Status ColumnChunkReader::load_page_data() {
_page_decoder = _decoders[static_cast<int>(encoding)].get();
} else {
std::unique_ptr<Decoder> page_decoder;
- Decoder::getDecoder(_metadata.type, encoding, page_decoder);
+ Decoder::get_decoder(_metadata.type, encoding, page_decoder);
_decoders[static_cast<int>(encoding)] = std::move(page_decoder);
_page_decoder = _decoders[static_cast<int>(encoding)].get();
}
_page_decoder->set_data(&_page_data);
- if (_type_length > 0) {
- _page_decoder->set_type_length(_type_length);
- }
+ // Set type length
+ _page_decoder->set_type_length(_get_type_length());
return Status::OK();
}
@@ -138,12 +136,22 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels,
size_t n) {
return _def_level_decoder.get_levels(levels, n);
}
-Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t
num_values) {
+Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr&
data_type,
+ size_t num_values) {
+ if (UNLIKELY(_num_values < num_values)) {
+ return Status::IOError("Decode too many values in current page");
+ }
+ _num_values -= num_values;
+ return _page_decoder->decode_values(doris_column, data_type, num_values);
+}
+
+Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
- return _page_decoder->decode_values(doris_column, num_values);
+ return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
@@ -153,4 +161,21 @@ Status ColumnChunkReader::decode_values(Slice& slice,
size_t num_values) {
_num_values -= num_values;
return _page_decoder->decode_values(slice, num_values);
}
+
+int32_t ColumnChunkReader::_get_type_length() {
+ switch (_field_schema->physical_type) {
+ case tparquet::Type::INT32:
+ case tparquet::Type::FLOAT:
+ return 4;
+ case tparquet::Type::INT64:
+ case tparquet::Type::DOUBLE:
+ return 8;
+ case tparquet::Type::INT96:
+ return 12;
+ case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+ return _field_schema->parquet_schema.type_length;
+ default:
+ return -1;
+ }
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 282612bd21..f8510d4b37 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -59,13 +59,11 @@ namespace doris::vectorized {
class ColumnChunkReader {
public:
ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk*
column_chunk,
- FieldSchema* fieldSchema);
+ FieldSchema* field_schema);
~ColumnChunkReader() = default;
// Initialize chunk reader, will generate the decoder and codec.
- // We can set the type_length if the length of colum type if fixed,
- // or not set, the decoder will try to infer the type_length.
- Status init(size_t type_length = -1);
+ Status init();
// Whether the chunk reader has a more page to read.
bool has_next_page() { return _page_reader->has_next_page(); }
@@ -86,8 +84,11 @@ public:
// Load page data into the underlying container,
// and initialize the repetition and definition level decoder for current
page data.
Status load_page_data();
- // The remaining number of values in current page. Decreased when reading
or skipping.
+ // The remaining number of values in current page(including null values).
Decreased when reading or skipping.
uint32_t num_values() const { return _num_values; };
+ // null values are not analyzing from definition levels
+ // the caller should maintain the consistency after analyzing null values
from definition levels.
+ void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@@ -97,7 +98,8 @@ public:
size_t get_def_levels(level_t* levels, size_t n);
// Decode values in current page into doris column.
- Status decode_values(ColumnPtr& doris_column, size_t num_values);
+ Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
+ Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type, size_t num_values);
// For test, Decode values in current page into slice.
Status decode_values(Slice& slice, size_t num_values);
@@ -109,7 +111,9 @@ public:
private:
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
+ int32_t _get_type_length();
+ FieldSchema* _field_schema;
level_t _max_rep_level;
level_t _max_def_level;
@@ -131,7 +135,6 @@ private:
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the
encoding will fall back to the plain encoding
std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
- size_t _type_length = -1;
};
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index db91103c88..95df8bd9a2 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -22,10 +22,15 @@
#include <string>
+#include "exec/schema_scanner.h"
#include "io/buffered_reader.h"
#include "io/file_reader.h"
#include "io/local_file_reader.h"
+#include "runtime/string_value.h"
#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/format/parquet/parquet_thrift_util.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
@@ -125,7 +130,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
}
static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk* column_chunk,
- FieldSchema* field_schema, Slice& slice) {
+ FieldSchema* field_schema, ColumnPtr&
doris_column,
+ DataTypePtr& data_type) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
@@ -141,7 +147,35 @@ static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk*
// load page data into underlying container
chunk_reader.load_page_data();
// decode page data
- return chunk_reader.decode_values(slice, chunk_reader.num_values());
+ return chunk_reader.decode_values(doris_column, data_type,
chunk_reader.num_values());
+}
+
+static void create_block(std::unique_ptr<vectorized::Block>& block) {
+ // Current supported column type:
+ SchemaScanner::ColumnDesc column_descs[] = {
+ {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true},
+ {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true},
+ {"int_col", TYPE_INT, sizeof(int32_t), true},
+ {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true},
+ {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true},
+ {"float_col", TYPE_FLOAT, sizeof(float_t), true},
+ {"double_col", TYPE_DOUBLE, sizeof(double_t), true},
+ {"string_col", TYPE_STRING, sizeof(StringValue), true}};
+ SchemaScanner schema_scanner(column_descs,
+ sizeof(column_descs) /
sizeof(SchemaScanner::ColumnDesc));
+ ObjectPool object_pool;
+ SchemaScannerParam param;
+ schema_scanner.init(¶m, &object_pool);
+ auto tuple_slots =
const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots();
+ block.reset(new vectorized::Block());
+ for (const auto& slot_desc : tuple_slots) {
+ auto is_nullable = slot_desc->is_nullable();
+ auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
+
is_nullable);
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(
+ ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
+ }
}
TEST_F(ParquetThriftReaderTest, type_decoder) {
@@ -164,6 +198,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
* `date_col` date, // 13
* `list_string` array<string>) // 14
*/
+
LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
/*
* Data in type-decoder.parquet:
@@ -181,6 +216,8 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto st = reader.open();
EXPECT_TRUE(st.ok());
+ std::unique_ptr<vectorized::Block> block;
+ create_block(block);
std::shared_ptr<FileMetaData> metaData;
parse_thrift_footer(&reader, metaData);
tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
@@ -190,51 +227,98 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
// the physical_type of tinyint_col, smallint_col and int_col are all INT32
// they are distinguished by converted_type(in
FieldSchema.parquet_schema.converted_type)
- for (int col_idx = 0; col_idx < 3; ++col_idx) {
- char data[4 * rows];
- Slice slice(data, 4 * rows);
- get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice);
- auto out_data = reinterpret_cast<int32_t*>(data);
+ {
+ auto& column_name_with_type = block->get_by_position(0);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
+ data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
- int_sum += out_data[i];
+ int_sum += (int8_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
- // `bigint_col` bigint, // 3
{
- char data[8 * rows];
- Slice slice(data, 8 * rows);
- get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice);
- auto out_data = reinterpret_cast<int64_t*>(data);
+ auto& column_name_with_type = block->get_by_position(1);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
+ data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
- int_sum += out_data[i];
+ int_sum += (int16_t)data_column->get64(i);
+ }
+ ASSERT_EQ(int_sum, 5);
+ }
+ {
+ auto& column_name_with_type = block->get_by_position(2);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
+ data_type);
+ int int_sum = 0;
+ for (int i = 0; i < rows; ++i) {
+ int_sum += (int32_t)data_column->get64(i);
+ }
+ ASSERT_EQ(int_sum, 5);
+ }
+ {
+ auto& column_name_with_type = block->get_by_position(3);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
+ data_type);
+ int64_t int_sum = 0;
+ for (int i = 0; i < rows; ++i) {
+ int_sum += (int64_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
// `boolean_col` boolean, // 4
{
- char data[1 * rows];
- Slice slice(data, 1 * rows);
+ auto& column_name_with_type = block->get_by_position(4);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice);
- auto out_data = reinterpret_cast<bool*>(data);
- ASSERT_FALSE(out_data[0]);
- ASSERT_TRUE(out_data[1]);
- ASSERT_FALSE(out_data[2]);
- ASSERT_TRUE(out_data[3]);
- ASSERT_FALSE(out_data[4]);
- ASSERT_FALSE(out_data[5]);
- ASSERT_TRUE(out_data[6]);
- ASSERT_FALSE(out_data[7]);
- ASSERT_FALSE(out_data[8]);
- ASSERT_FALSE(out_data[9]);
+
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
+ data_type);
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
+ ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
+ ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
+ ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
+ ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
+ }
+ // `double_col` double, // 6
+ {
+ auto& column_name_with_type = block->get_by_position(6);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
+ data_type);
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(data_column)).mutate().get());
+ MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
+ ASSERT_EQ(nested_column->get_float64(0), -1.14);
+ ASSERT_EQ(nested_column->get_float64(1), 2.14);
+ ASSERT_EQ(nested_column->get_float64(2), -3.14);
+ ASSERT_EQ(nested_column->get_float64(3), 4.14);
}
// `string_col` string, // 7
{
+ auto& column_name_with_type = block->get_by_position(7);
+ auto& data_column = column_name_with_type.column;
+ auto& data_type = column_name_with_type.type;
tparquet::ColumnChunk column_chunk =
t_metadata.row_groups[0].columns[7];
tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
@@ -242,7 +326,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
: chunk_meta.data_page_offset;
size_t chunk_size = chunk_meta.total_compressed_size;
BufferedFileStreamReader stream_reader(&reader, start_offset,
chunk_size);
-
ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
const_cast<FieldSchema*>(schema_descriptor.get_column(7)));
// initialize chunk reader
@@ -252,8 +335,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
// load page data into underlying container
chunk_reader.load_page_data();
- char data[50 * rows];
- Slice slice(data, 50 * rows);
level_t defs[rows];
// Analyze null string
chunk_reader.get_def_levels(defs, rows);
@@ -261,9 +342,14 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
ASSERT_EQ(defs[3], 0);
ASSERT_EQ(defs[7], 0);
- chunk_reader.decode_values(slice, 7);
- ASSERT_STREQ("s-row0", slice.data);
- ASSERT_STREQ("s-row2", slice.data + 7);
+ chunk_reader.decode_values(data_column, data_type, 7);
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(data_column)).mutate().get());
+ MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
+ auto row0 = nested_column->get_data_at(0).data;
+ auto row2 = nested_column->get_data_at(1).data;
+ ASSERT_STREQ("s-row0", row0);
+ ASSERT_STREQ("s-row2", row2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]