This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37d1180cca [feature-wip](parquet-reader)decode parquet data (#11536)
37d1180cca is described below
commit 37d1180cca26494f65ec6074ab68a4350cf083e6
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Aug 8 12:44:06 2022 +0800
[feature-wip](parquet-reader)decode parquet data (#11536)
---
be/src/common/config.h | 1 +
be/src/io/buffered_reader.cpp | 19 ++-
be/src/io/buffered_reader.h | 22 ++--
be/src/util/block_compression.cpp | 30 +++++
be/src/util/block_compression.h | 4 +
be/src/util/rle_encoding.h | 56 +++++++++
be/src/vec/CMakeLists.txt | 4 +-
be/src/vec/exec/format/parquet/level_decoder.cpp | 76 ++++++++++++
be/src/vec/exec/format/parquet/level_decoder.h | 61 ++++++++++
be/src/vec/exec/format/parquet/parquet_common.cpp | 60 ++++++++++
be/src/vec/exec/format/parquet/parquet_common.h | 108 +++++++++++++++++
be/src/vec/exec/format/parquet/schema_desc.cpp | 11 +-
.../parquet/vparquet_column_chunk_reader.cpp | 130 +++++++++++++++++++--
.../format/parquet/vparquet_column_chunk_reader.h | 111 +++++++++++++++++-
.../exec/format/parquet/vparquet_page_reader.cpp | 30 +++--
.../vec/exec/format/parquet/vparquet_page_reader.h | 16 ++-
.../test_data/parquet_scanner/type-decoder.parquet | Bin 0 -> 338 bytes
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 55 +++++++++
18 files changed, 734 insertions(+), 60 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 258000e540..59e56bcde3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -792,6 +792,7 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
CONF_Bool(parquet_predicate_push_down, "true");
+CONF_Int32(parquet_header_max_size, "8388608");
// When the rows number reached this limit, will check the filter rate the of
bloomfilter
// if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index 8e2446b9e1..ca40979321 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -185,10 +185,11 @@ bool BufferedReader::closed() {
return _reader->closed();
}
-BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, int64_t
offset, int64_t length)
+BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t
offset,
+ uint64_t length)
: _file(file), _file_start_offset(offset), _file_end_offset(offset +
length) {}
-Status BufferedFileStreamReader::seek(int64_t position) {
+Status BufferedFileStreamReader::seek(uint64_t position) {
if (_file_position != position) {
RETURN_IF_ERROR(_file->seek(position));
_file_position = position;
@@ -196,8 +197,8 @@ Status BufferedFileStreamReader::seek(int64_t position) {
return Status::OK();
}
-Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, int64_t
offset,
- int64_t* bytes_to_read) {
+Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t
offset,
+ size_t* bytes_to_read) {
if (offset < _file_start_offset) {
return Status::IOError("Out-of-bounds Access");
}
@@ -230,19 +231,15 @@ Status BufferedFileStreamReader::read_bytes(const
uint8_t** buf, int64_t offset,
RETURN_IF_ERROR(seek(_buf_end_offset));
bool eof = false;
int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
- RETURN_IF_ERROR(
- _file->read(_buf.get() + buf_remaining, _buf_size - buf_remaining,
&to_read, &eof));
+ RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read,
&eof));
*bytes_to_read = buf_remaining + to_read;
_buf_end_offset += to_read;
*buf = _buf.get();
return Status::OK();
}
-Status BufferedFileStreamReader::read_bytes(Slice& slice, int64_t offset) {
- int64_t bytes_to_read = slice.size;
- Status st = read_bytes((const uint8_t**)&slice.data, offset,
&bytes_to_read);
- slice.size = bytes_to_read;
- return st;
+Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) {
+ return read_bytes((const uint8_t**)&slice.data, offset, &slice.size);
}
} // namespace doris
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index d4a5f37927..2cfcaaa413 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -93,34 +93,34 @@ public:
* @param offset start offset ot read in stream
* @param bytes_to_read bytes to read
*/
- virtual Status read_bytes(const uint8_t** buf, int64_t offset, int64_t*
bytes_to_read) = 0;
+ virtual Status read_bytes(const uint8_t** buf, uint64_t offset, size_t*
bytes_to_read) = 0;
/**
* Save the data address to slice.data, and the slice.size is the bytes to
read.
*/
- virtual Status read_bytes(Slice& slice, int64_t offset) = 0;
+ virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
virtual ~BufferedStreamReader() = default;
};
class BufferedFileStreamReader : public BufferedStreamReader {
public:
- BufferedFileStreamReader(FileReader* file, int64_t offset, int64_t length);
+ BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t
length);
~BufferedFileStreamReader() override = default;
- Status read_bytes(const uint8_t** buf, int64_t stream_offset, int64_t*
bytes_to_read) override;
- Status read_bytes(Slice& slice, int64_t stream_offset) override;
+ Status read_bytes(const uint8_t** buf, uint64_t offset, size_t*
bytes_to_read) override;
+ Status read_bytes(Slice& slice, uint64_t offset) override;
private:
std::unique_ptr<uint8_t[]> _buf;
FileReader* _file;
- int64_t _file_start_offset;
- int64_t _file_end_offset;
+ uint64_t _file_start_offset;
+ uint64_t _file_end_offset;
int64_t _file_position = -1;
- int64_t _buf_start_offset = 0;
- int64_t _buf_end_offset = 0;
- int64_t _buf_size = 0;
+ uint64_t _buf_start_offset = 0;
+ uint64_t _buf_end_offset = 0;
+ size_t _buf_size = 0;
- Status seek(int64_t position);
+ Status seek(uint64_t position);
};
} // namespace doris
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index 4279080b21..1dafd20f54 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -550,4 +550,34 @@ Status
get_block_compression_codec(segment_v2::CompressionTypePB type,
return st;
}
+Status get_block_compression_codec(tparquet::CompressionCodec::type
parquet_codec,
+ std::unique_ptr<BlockCompressionCodec>&
codec) {
+ BlockCompressionCodec* ptr = nullptr;
+ switch (parquet_codec) {
+ case tparquet::CompressionCodec::UNCOMPRESSED:
+ codec.reset(nullptr);
+ return Status::OK();
+ case tparquet::CompressionCodec::SNAPPY:
+ ptr = new SnappyBlockCompression();
+ break;
+ case tparquet::CompressionCodec::LZ4:
+ ptr = new Lz4BlockCompression();
+ break;
+ case tparquet::CompressionCodec::ZSTD:
+ ptr = new ZstdBlockCompression();
+ break;
+ default:
+ return Status::NotFound("unknown compression type({})", parquet_codec);
+ }
+
+ Status st = ptr->init();
+ if (st.ok()) {
+ codec.reset(ptr);
+ } else {
+ delete ptr;
+ }
+
+ return st;
+}
+
} // namespace doris
diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h
index ddda23a3ba..9de8eb16ff 100644
--- a/be/src/util/block_compression.h
+++ b/be/src/util/block_compression.h
@@ -21,6 +21,7 @@
#include <vector>
#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
#include "gen_cpp/segment_v2.pb.h"
#include "util/slice.h"
@@ -70,4 +71,7 @@ public:
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
std::unique_ptr<BlockCompressionCodec>&
codec);
+Status get_block_compression_codec(tparquet::CompressionCodec::type
parquet_codec,
+ std::unique_ptr<BlockCompressionCodec>&
codec);
+
} // namespace doris
diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index 3340669b78..08a7a23a4d 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -110,6 +110,14 @@ public:
// GetNextRun will return more from the same run.
size_t GetNextRun(T* val, size_t max_run);
+ size_t get_values(T* values, size_t num_values);
+
+ // Get the count of current repeated value
+ size_t repeated_count();
+
+ // Get current repeated value, make sure that count equals repeated_count()
+ T get_repeated_value(size_t count);
+
private:
bool ReadHeader();
@@ -334,6 +342,54 @@ inline size_t RleDecoder<T>::GetNextRun(T* val, size_t
max_run) {
return ret;
}
+template <typename T>
+inline size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
+ size_t read_num = 0;
+ while (read_num < num_values) {
+ size_t read_this_time = num_values - read_num;
+
+ if (LIKELY(repeat_count_ > 0)) {
+ read_this_time = std::min((size_t)repeat_count_, read_this_time);
+ std::fill(values, values + read_this_time, current_value_);
+ values += read_this_time;
+ repeat_count_ -= read_this_time;
+ read_num += read_this_time;
+ } else if (literal_count_ > 0) {
+ read_this_time = std::min((size_t)literal_count_, read_this_time);
+ for (int i = 0; i < read_this_time; ++i) {
+ bool result = bit_reader_.GetValue(bit_width_, values);
+ DCHECK(result);
+ values++;
+ }
+ literal_count_ -= read_this_time;
+ read_num += read_this_time;
+ } else {
+ if (!ReadHeader()) {
+ return read_num;
+ }
+ }
+ }
+ return read_num;
+}
+
+template <typename T>
+inline size_t RleDecoder<T>::repeated_count() {
+ if (repeat_count_ > 0) {
+ return repeat_count_;
+ }
+ if (literal_count_ == 0) {
+ ReadHeader();
+ }
+ return repeat_count_;
+}
+
+template <typename T>
+inline T RleDecoder<T>::get_repeated_value(size_t count) {
+ DCHECK_GE(repeat_count_, count);
+ repeat_count_ -= count;
+ return current_value_;
+}
+
template <typename T>
inline size_t RleDecoder<T>::Skip(size_t to_skip) {
DCHECK(bit_reader_.is_initialized());
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 56fdb70a4b..bc37c49310 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -229,7 +229,9 @@ set(VEC_FILES
exec/format/parquet/vparquet_file_metadata.cpp
exec/format/parquet/vparquet_page_reader.cpp
exec/format/parquet/schema_desc.cpp
- exec/format/parquet/vparquet_column_reader.cpp)
+ exec/format/parquet/vparquet_column_reader.cpp
+ exec/format/parquet/level_decoder.cpp
+ exec/format/parquet/parquet_common.cpp)
add_library(Vec STATIC
${VEC_FILES}
diff --git a/be/src/vec/exec/format/parquet/level_decoder.cpp
b/be/src/vec/exec/format/parquet/level_decoder.cpp
new file mode 100644
index 0000000000..616da10b8e
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/level_decoder.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "level_decoder.h"
+
+#include "util/bit_util.h"
+#include "util/coding.h"
+
+doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
+ tparquet::Encoding::type
encoding,
+ doris::vectorized::level_t
max_level,
+ uint32_t num_levels) {
+ _encoding = encoding;
+ _bit_width = BitUtil::log2(max_level + 1);
+ _max_level = max_level;
+ _num_levels = num_levels;
+ switch (encoding) {
+ case tparquet::Encoding::RLE: {
+ if (slice->size < 4) {
+ return Status::Corruption("Wrong parquet level format");
+ }
+
+ uint8_t* data = (uint8_t*)slice->data;
+ uint32_t num_bytes = decode_fixed32_le(data);
+ if (num_bytes > slice->size - 4) {
+ return Status::Corruption("Wrong parquet level format");
+ }
+ _rle_decoder = RleDecoder<level_t>(data + 4, num_bytes, _bit_width);
+
+ slice->data += 4 + num_bytes;
+ slice->size -= 4 + num_bytes;
+ break;
+ }
+ case tparquet::Encoding::BIT_PACKED: {
+ uint32_t num_bits = num_levels * _bit_width;
+ uint32_t num_bytes = BitUtil::RoundUpNumBytes(num_bits);
+ if (num_bytes > slice->size) {
+ return Status::Corruption("Wrong parquet level format");
+ }
+ _bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes);
+
+ slice->data += num_bytes;
+ slice->size -= num_bytes;
+ break;
+ }
+ default:
+ return Status::IOError("Unsupported encoding for parquet level");
+ }
+ return Status::OK();
+}
+
+size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t*
levels, size_t n) {
+ if (_encoding == tparquet::Encoding::RLE) {
+ n = std::min((size_t)_num_levels, n);
+ auto num_decoded = _rle_decoder.get_values(levels, n);
+ _num_levels -= num_decoded;
+ return num_decoded;
+ } else if (_encoding == tparquet::Encoding::BIT_PACKED) {
+ // TODO(gaoxin): BIT_PACKED encoding
+ }
+ return 0;
+}
diff --git a/be/src/vec/exec/format/parquet/level_decoder.h
b/be/src/vec/exec/format/parquet/level_decoder.h
new file mode 100644
index 0000000000..be6c6c6154
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/level_decoder.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "parquet_common.h"
+#include "util/bit_stream_utils.h"
+#include "util/rle_encoding.h"
+
+namespace doris::vectorized {
+
+class LevelDecoder {
+public:
+ LevelDecoder() = default;
+ ~LevelDecoder() = default;
+
+ Status init(Slice* slice, tparquet::Encoding::type encoding, level_t
max_level,
+ uint32_t num_levels);
+
+ bool has_levels() const { return _num_levels > 0; }
+
+ size_t get_levels(level_t* levels, size_t n);
+
+ size_t next_repeated_count() {
+ DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
+ return _rle_decoder.repeated_count();
+ }
+
+ level_t get_repeated_value(size_t count) {
+ DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
+ return _rle_decoder.get_repeated_value(count);
+ }
+
+private:
+ tparquet::Encoding::type _encoding;
+ level_t _bit_width = 0;
+ level_t _max_level = 0;
+ uint32_t _num_levels = 0;
+ RleDecoder<level_t> _rle_decoder;
+ BitReader _bit_packed_decoder;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
new file mode 100644
index 0000000000..ec0c3ce411
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet_common.h"
+
+namespace doris::vectorized {
+
+Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type
encoding,
+ std::unique_ptr<Decoder>& decoder) {
+ switch (encoding) {
+ case tparquet::Encoding::PLAIN:
+ switch (type) {
+ case tparquet::Type::INT32:
+ decoder.reset(new PlainDecoder<Int32>());
+ break;
+ case tparquet::Type::INT64:
+ decoder.reset(new PlainDecoder<Int64>());
+ break;
+ case tparquet::Type::FLOAT:
+ decoder.reset(new PlainDecoder<Float32>());
+ break;
+ case tparquet::Type::DOUBLE:
+ decoder.reset(new PlainDecoder<Float64>());
+ break;
+ default:
+ return Status::InternalError("Unsupported plain type {} in parquet
decoder",
+ tparquet::to_string(type));
+ }
+ case tparquet::Encoding::RLE_DICTIONARY:
+ break;
+ default:
+ return Status::InternalError("Unsupported encoding {} in parquet
decoder",
+ tparquet::to_string(encoding));
+ }
+ return Status::OK();
+}
+
+MutableColumnPtr Decoder::getMutableColumnPtr(ColumnPtr& doris_column) {
+ // src column always be nullable for simple converting
+ CHECK(doris_column->is_nullable());
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(doris_column)).mutate().get());
+ return nullable_column->get_nested_column_ptr();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h
b/be/src/vec/exec/format/parquet/parquet_common.h
new file mode 100644
index 0000000000..0f0e9f6e3d
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_nullable.h"
+
+namespace doris::vectorized {
+
+using level_t = int16_t;
+
+class Decoder {
+public:
+ Decoder() = default;
+ virtual ~Decoder() = default;
+
+ static Status getDecoder(tparquet::Type::type type,
tparquet::Encoding::type encoding,
+ std::unique_ptr<Decoder>& decoder);
+
+ static MutableColumnPtr getMutableColumnPtr(ColumnPtr& doris_column);
+
+ // The type with fix length
+ void set_type_length(int32_t type_length) { _type_length = type_length; }
+
+ // Set the data to be decoded
+ void set_data(Slice* data) {
+ _data = data;
+ _offset = 0;
+ }
+
+ // Write the decoded values batch to doris's column
+ virtual Status decode_values(ColumnPtr& doris_column, size_t num_values) =
0;
+
+ virtual Status decode_values(Slice& slice, size_t num_values) = 0;
+
+ virtual Status skip_values(size_t num_values) = 0;
+
+protected:
+ int32_t _type_length;
+ Slice* _data = nullptr;
+ uint32_t _offset = 0;
+};
+
+template <typename T>
+class PlainDecoder final : public Decoder {
+public:
+ PlainDecoder() = default;
+ ~PlainDecoder() override = default;
+
+ Status decode_values(ColumnPtr& doris_column, size_t num_values) override {
+ size_t to_read_bytes = TYPE_LENGTH * num_values;
+ if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+ return Status::IOError("Out-of-bounds access in parquet data
decoder");
+ }
+ auto data_column = getMutableColumnPtr(doris_column);
+ auto& column_data =
static_cast<ColumnVector<T>&>(*data_column).get_data();
+ const auto* raw_data = reinterpret_cast<const T*>(_data->data +
_offset);
+ column_data.insert(raw_data, raw_data + num_values);
+ _offset += to_read_bytes;
+ return Status::OK();
+ }
+
+ Status decode_values(Slice& slice, size_t num_values) override {
+ size_t to_read_bytes = TYPE_LENGTH * num_values;
+ if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
+ return Status::IOError("Out-of-bounds access in parquet data
decoder");
+ }
+ if (UNLIKELY(to_read_bytes > slice.size)) {
+ return Status::IOError(
+ "Slice does not have enough space to write out the
decoding data");
+ }
+ memcpy(slice.data, _data->data + _offset, to_read_bytes);
+ _offset += to_read_bytes;
+ return Status::OK();
+ }
+
+ Status skip_values(size_t num_values) override {
+ _offset += TYPE_LENGTH * num_values;
+ if (UNLIKELY(_offset > _data->size)) {
+ return Status::IOError("Out-of-bounds access in parquet data
decoder");
+ }
+ return Status::OK();
+ }
+
+protected:
+ enum { TYPE_LENGTH = sizeof(T) };
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 51969220ed..7f6f36f023 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -17,8 +17,6 @@
#include "schema_desc.h"
-#include "gutil/strings/substitute.h"
-
namespace doris::vectorized {
static bool is_group_node(const tparquet::SchemaElement& schema) {
@@ -98,15 +96,14 @@ Status FieldDescriptor::parse_from_thrift(const
std::vector<tparquet::SchemaElem
for (int i = 0; i < root_schema.num_children; ++i) {
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos,
&_fields[i]));
if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
- return Status::InvalidArgument(
- strings::Substitute("Duplicated field name: {}",
_fields[i].name));
+ return Status::InvalidArgument("Duplicated field name: {}",
_fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
}
if (_next_schema_pos != t_schemas.size()) {
- return Status::InvalidArgument(strings::Substitute("Remaining {}
unparsed schema elements",
- t_schemas.size() -
_next_schema_pos));
+ return Status::InvalidArgument("Remaining {} unparsed schema elements",
+ t_schemas.size() - _next_schema_pos);
}
return Status::OK();
@@ -355,7 +352,7 @@ int FieldDescriptor::get_column_index(const std::string&
column) const {
return -1;
}
-const FieldSchema* FieldDescriptor::get_column(const string& name) const {
+const FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
auto it = _name_to_field.find(name);
if (it != _name_to_field.end()) {
return it->second;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 274882179b..11d6bfaf94 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -14,27 +14,135 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
-Status ColumnChunkReader::init() {
- return Status();
+ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader,
+ tparquet::ColumnChunk* column_chunk,
FieldSchema* fieldSchema)
+ : _max_rep_level(fieldSchema->repetition_level),
+ _max_def_level(fieldSchema->definition_level),
+ _stream_reader(reader),
+ _metadata(column_chunk->meta_data) {}
+
+Status ColumnChunkReader::init(size_t type_length) {
+ size_t start_offset = _metadata.__isset.dictionary_page_offset
+ ? _metadata.dictionary_page_offset
+ : _metadata.data_page_offset;
+ size_t chunk_size = _metadata.total_compressed_size;
+ _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset,
chunk_size);
+
+ if (_metadata.__isset.dictionary_page_offset) {
+ RETURN_IF_ERROR(_decode_dict_page());
+ }
+ // seek to the first data page
+ _page_reader->seek_to_page(_metadata.data_page_offset);
+
+ // get the block compression codec
+ RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec,
_block_compress_codec));
+ // -1 means unfixed length type
+ _type_length = type_length;
+
+ return Status::OK();
+}
+
+Status ColumnChunkReader::next_page() {
+ RETURN_IF_ERROR(_page_reader->next_page());
+ _num_values = _page_reader->get_page_header()->data_page_header.num_values;
+ return Status::OK();
+}
+
+Status ColumnChunkReader::load_page_data() {
+ const auto& header = *_page_reader->get_page_header();
+ // int32_t compressed_size = header.compressed_page_size;
+ int32_t uncompressed_size = header.uncompressed_page_size;
+
+ if (_block_compress_codec != nullptr) {
+ Slice compressed_data;
+ RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data));
+ // check decompressed buffer size
+ _reserve_decompress_buf(uncompressed_size);
+ _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+ RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&_page_data));
+ } else {
+ RETURN_IF_ERROR(_page_reader->get_page_date(_page_data));
+ }
+
+ // Initialize repetition level and definition level. Skip when level = 0,
which means required field.
+ if (_max_rep_level > 0) {
+ RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
+
header.data_page_header.repetition_level_encoding,
+ _max_rep_level, _num_values));
+ }
+ if (_max_def_level > 0) {
+ RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
+
header.data_page_header.definition_level_encoding,
+ _max_def_level, _num_values));
+ }
+
+ auto encoding = header.data_page_header.encoding;
+ // change the deprecated encoding to RLE_DICTIONARY
+ if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
+ encoding = tparquet::Encoding::RLE_DICTIONARY;
+ }
+ Decoder::getDecoder(_metadata.type, encoding, _page_decoder);
+ _page_decoder->set_data(&_page_data);
+ if (_type_length > 0) {
+ _page_decoder->set_type_length(_type_length);
+ }
+
+ return Status::OK();
+}
+
+Status ColumnChunkReader::_decode_dict_page() {
+ int64_t dict_offset = _metadata.dictionary_page_offset;
+ _page_reader->seek_to_page(dict_offset);
+ _page_reader->next_page();
+ const tparquet::PageHeader& header = *_page_reader->get_page_header();
+ DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
+ // TODO(gaoxin): decode dictionary page
+ return Status::OK();
+}
+
+void ColumnChunkReader::_reserve_decompress_buf(size_t size) {
+ if (size > _decompress_buf_size) {
+ _decompress_buf_size = BitUtil::next_power_of_two(size);
+ _decompress_buf.reset(new uint8_t[_decompress_buf_size]);
+ }
+}
+
+Status ColumnChunkReader::skip_values(size_t num_values) {
+ if (UNLIKELY(_num_values < num_values)) {
+ return Status::IOError("Skip too many values in current page");
+ }
+ _num_values -= num_values;
+ return _page_decoder->skip_values(num_values);
}
-Status ColumnChunkReader::read_min_max_stat() {
- return Status();
+size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
+ DCHECK_GT(_max_rep_level, 0);
+ return _def_level_decoder.get_levels(levels, n);
}
-Status ColumnChunkReader::decode_dict_page() {
- return Status();
+size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
+ DCHECK_GT(_max_def_level, 0);
+ return _rep_level_decoder.get_levels(levels, n);
}
-Status ColumnChunkReader::decode_nested_page() {
- return Status();
+Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t
num_values) {
+ if (UNLIKELY(_num_values < num_values)) {
+ return Status::IOError("Decode too many values in current page");
+ }
+ _num_values -= num_values;
+ return _page_decoder->decode_values(doris_column, num_values);
}
-Status ColumnChunkReader::read_next_page() {
- return Status();
+Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
+ if (UNLIKELY(_num_values < num_values)) {
+ return Status::IOError("Decode too many values in current page");
+ }
+ _num_values -= num_values;
+ return _page_decoder->decode_values(slice, num_values);
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index dac287096a..c3b58be5c0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -16,20 +16,119 @@
// under the License.
#pragma once
-#include <common/status.h>
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/parquet_types.h"
+#include "io/buffered_reader.h"
+#include "level_decoder.h"
+#include "parquet_common.h"
+#include "schema_desc.h"
+#include "util/block_compression.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_nullable.h"
+#include "vparquet_page_reader.h"
namespace doris::vectorized {
+/**
+ * Read and decode parquet column data into doris block column.
+ * <p>Usage:</p>
+ * // Create chunk reader
+ * ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
+ * tparquet::ColumnChunk* column_chunk,
+ * FieldSchema* fieldSchema);
+ * // Initialize chunk reader, we can set the type length if the length of
column type is fixed.
+ * // If not set, default value = -1, then the decoder will infer the type
length.
+ * chunk_reader.init();
+ * while (chunk_reader.has_next_page()) {
+ * // Seek to next page header. Only read and parse the page header, not
page data.
+ * chunk_reader.next_page();
+ * // Load data to decoder. Load the page data into underlying container.
+ * // Or, we can call the chunk_reader.skip_page() to skip current page.
+ * chunk_reader.load_page_data();
+ * // Decode values into column or slice.
+ * // Or, we can call chunk_reader.slip_values(num_values) to skip some
values.
+ * chunk_reader.decode_values(slice, num_values);
+ * }
+ */
class ColumnChunkReader {
public:
- Status init();
- Status read_next_page();
+ ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk*
column_chunk,
+ FieldSchema* fieldSchema);
+ ~ColumnChunkReader() = default;
+
+ // Initialize chunk reader, will generate the decoder and codec.
+ // We can set the type_length if the length of colum type if fixed,
+ // or not set, the decoder will try to infer the type_length.
+ Status init(size_t type_length = -1);
+
+ // Whether the chunk reader has a more page to read.
+ bool has_next_page() { return _page_reader->has_next_page(); }
+
+ // Seek to the specific page, page_header_offset must be the start offset
of the page header.
+ void seek_to_page(int64_t page_header_offset) {
+ _page_reader->seek_to_page(page_header_offset);
+ }
+
+ // Seek to next page. Only read and parse the page header.
+ Status next_page();
+
+ // Skip current page(will not read and parse) if the page is filtered by
predicates.
+ Status skip_page() { return _page_reader->skip_page(); }
+ // Skip some values(will not read and parse) in current page if the values
are filtered by predicates.
+ Status skip_values(size_t num_values);
- Status read_min_max_stat();
- Status decode_dict_page();
- Status decode_nested_page();
+ // Load page data into the underlying container,
+ // and initialize the repetition and definition level decoder for current
page data.
+ Status load_page_data();
+ // The remaining number of values in current page. Decreased when reading
or skipping.
+ uint32_t num_values() const { return _num_values; };
+ // Get the raw data of current page.
+ Slice& get_page_data() { return _page_data; }
+
+ // Get the repetition levels
+ size_t get_rep_levels(level_t* levels, size_t n);
+ // Get the definition levels
+ size_t get_def_levels(level_t* levels, size_t n);
+
+ // Decode values in current page into doris column.
+ Status decode_values(ColumnPtr& doris_column, size_t num_values);
+ // For test, Decode values in current page into slice.
+ Status decode_values(Slice& slice, size_t num_values);
+
+ // Get the repetition level decoder of current page.
+ LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
+ // Get the definition level decoder of current page.
+ LevelDecoder& def_level_decoder() { return _def_level_decoder; }
private:
+ Status _decode_dict_page();
+ void _reserve_decompress_buf(size_t size);
+
+ level_t _max_rep_level;
+ level_t _max_def_level;
+
+ BufferedStreamReader* _stream_reader;
+ // tparquet::ColumnChunk* _column_chunk;
+ tparquet::ColumnMetaData& _metadata;
+ // FieldSchema* _field_schema;
+
+ std::unique_ptr<PageReader> _page_reader = nullptr;
+ std::unique_ptr<BlockCompressionCodec> _block_compress_codec = nullptr;
+
+ LevelDecoder _rep_level_decoder;
+ LevelDecoder _def_level_decoder;
+ uint32_t _num_values = 0;
+ Slice _page_data;
+ std::unique_ptr<uint8_t[]> _decompress_buf;
+ size_t _decompress_buf_size = 0;
+ std::unique_ptr<Decoder> _page_decoder = nullptr;
+ size_t _type_length = -1;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 533caa3db0..f554be169e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -23,12 +23,12 @@
namespace doris::vectorized {
-static constexpr int64_t initPageHeaderSize = 1024;
+static constexpr size_t initPageHeaderSize = 1024;
-PageReader::PageReader(BufferedStreamReader* reader, int64_t start_offset,
int64_t length)
- : _reader(reader), _start_offset(start_offset),
_end_offset(start_offset + length) {}
+PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t
length)
+ : _reader(reader), _start_offset(offset), _end_offset(offset + length)
{}
-Status PageReader::next_page(Slice& slice) {
+Status PageReader::next_page() {
if (_offset < _start_offset || _offset >= _end_offset) {
return Status::IOError("Out-of-bounds Access");
}
@@ -37,8 +37,8 @@ Status PageReader::next_page(Slice& slice) {
}
const uint8_t* page_header_buf = nullptr;
- int64_t max_size = _end_offset - _offset;
- int64_t header_size = std::min(initPageHeaderSize, max_size);
+ size_t max_size = _end_offset - _offset;
+ size_t header_size = std::min(initPageHeaderSize, max_size);
uint32_t real_header_size = 0;
while (true) {
header_size = std::min(header_size, max_size);
@@ -49,7 +49,8 @@ Status PageReader::next_page(Slice& slice) {
if (st.ok()) {
break;
}
- if (_offset + header_size >= _end_offset) {
+ if (_offset + header_size >= _end_offset ||
+ real_header_size > config::parquet_header_max_size) {
return Status::IOError("Failed to deserialize parquet page
header");
}
header_size <<= 2;
@@ -57,11 +58,26 @@ Status PageReader::next_page(Slice& slice) {
_offset += real_header_size;
_next_header_offset = _offset + _cur_page_header.compressed_page_size;
+ return Status::OK();
+}
+
+Status PageReader::skip_page() {
+ if (_offset == _next_header_offset) {
+ return Status::InternalError("Should call next_page() to generate page
header");
+ }
+ _offset = _next_header_offset;
+ return Status::OK();
+}
+Status PageReader::get_page_date(Slice& slice) {
+ if (_offset == _next_header_offset) {
+ return Status::InternalError("Should call next_page() to generate page
header");
+ }
slice.size = _cur_page_header.compressed_page_size;
RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size);
_offset += slice.size;
return Status::OK();
}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 523098cc44..cf95812ead 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -29,15 +29,19 @@ namespace doris::vectorized {
class PageReader {
public:
public:
- PageReader(BufferedStreamReader* reader, int64_t start_offset, int64_t
length);
+ PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length);
~PageReader() = default;
bool has_next_page() const { return _offset < _end_offset; }
- Status next_page(Slice& slice);
+ Status next_page();
+
+ Status skip_page();
const tparquet::PageHeader* get_page_header() const { return
&_cur_page_header; }
+ Status get_page_date(Slice& slice);
+
void seek_to_page(int64_t page_header_offset) {
_offset = page_header_offset;
_next_header_offset = page_header_offset;
@@ -47,11 +51,11 @@ private:
BufferedStreamReader* _reader;
tparquet::PageHeader _cur_page_header;
- int64_t _offset = 0;
- int64_t _next_header_offset = 0;
+ uint64_t _offset = 0;
+ uint64_t _next_header_offset = 0;
- int64_t _start_offset = 0;
- int64_t _end_offset = 0;
+ uint64_t _start_offset = 0;
+ uint64_t _end_offset = 0;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.parquet
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet
new file mode 100644
index 0000000000..e4b5820161
Binary files /dev/null and
b/be/test/exec/test_data/parquet_scanner/type-decoder.parquet differ
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 5100ea32f3..02bcef6261 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -27,6 +27,7 @@
#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
#include "vec/exec/format/parquet/parquet_thrift_util.h"
+#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
namespace doris {
@@ -123,6 +124,60 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
}
+TEST_F(ParquetThriftReaderTest, column_reader) {
+ // type-decoder.parquet is the part of following table:
+ // create table type-decoder (
+ // int_col int)
+ // TODO(gaoxin): add more hive types
+ LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+ auto st = reader.open();
+ EXPECT_TRUE(st.ok());
+
+ std::shared_ptr<FileMetaData> metaData;
+ parse_thrift_footer(&reader, metaData);
+ tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
+
+ // read the `int_col` column, it's the int-type column, and has ten values:
+ // -1, 2, -3, 4, -5, 6, -7, 8, -9, 10
+ tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[0];
+ tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
+ size_t start_offset = chunk_meta.__isset.dictionary_page_offset
+ ? chunk_meta.dictionary_page_offset
+ : chunk_meta.data_page_offset;
+ size_t chunk_size = chunk_meta.total_compressed_size;
+ BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size);
+
+ FieldDescriptor schema_descriptor;
+ schema_descriptor.parse_from_thrift(t_metadata.schema);
+ auto field_schema =
const_cast<FieldSchema*>(schema_descriptor.get_column(0));
+
+ ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
field_schema);
+ size_t batch_size = 10;
+ size_t int_length = 4;
+ char data[batch_size * int_length];
+ Slice slice(data, batch_size * int_length);
+ chunk_reader.init();
+ uint64_t int_sum = 0;
+ while (chunk_reader.has_next_page()) {
+ // seek to next page header
+ chunk_reader.next_page();
+ // load data to decoder
+ chunk_reader.load_page_data();
+ while (chunk_reader.num_values() > 0) {
+ size_t num_values = chunk_reader.num_values() < batch_size
+ ? chunk_reader.num_values() <
batch_size
+ : batch_size;
+ chunk_reader.decode_values(slice, num_values);
+ auto out_data = reinterpret_cast<Int32*>(slice.data);
+ for (int i = 0; i < num_values; i++) {
+ Int32 value = out_data[i];
+ int_sum += value;
+ }
+ }
+ }
+ ASSERT_EQ(int_sum, 5);
+}
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]