PARQUET-857: Flatten parquet/encodings directory, consolidate code Author: Wes McKinney <[email protected]>
Closes #233 from wesm/code-consolidation and squashes the following commits: c20c022 [Wes McKinney] Flatten parquet/encodings directory, consolidate code Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/782049ba Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/782049ba Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/782049ba Branch: refs/heads/master Commit: 782049baca3bed48488b188f0292e968e024cf14 Parents: ad56e7a Author: Wes McKinney <[email protected]> Authored: Fri Feb 3 11:17:55 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Fri Feb 3 11:17:55 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 1 - benchmarks/decode_benchmark.cc | 6 +- src/parquet/CMakeLists.txt | 4 + src/parquet/arrow/arrow-reader-writer-test.cc | 7 +- src/parquet/column/column-writer-test.cc | 3 +- src/parquet/column/reader.cc | 4 +- src/parquet/column/reader.h | 4 +- src/parquet/column/scanner-test.cc | 3 +- src/parquet/column/statistics.cc | 2 +- src/parquet/column/test-util.h | 5 +- src/parquet/column/writer.cc | 4 +- src/parquet/column/writer.h | 2 +- src/parquet/encoding-benchmark.cc | 161 ++++ src/parquet/encoding-internal.h | 929 +++++++++++++++++++ src/parquet/encoding-test.cc | 305 ++++++ src/parquet/encoding.h | 141 +++ src/parquet/encodings/CMakeLists.txt | 30 - src/parquet/encodings/decoder.h | 95 -- src/parquet/encodings/delta-bit-pack-encoding.h | 125 --- .../encodings/delta-byte-array-encoding.h | 81 -- .../delta-length-byte-array-encoding.h | 70 -- src/parquet/encodings/dictionary-encoding.h | 473 ---------- src/parquet/encodings/encoder.h | 78 -- src/parquet/encodings/encoding-benchmark.cc | 161 ---- src/parquet/encodings/encoding-test.cc | 307 ------ src/parquet/encodings/plain-encoding.h | 290 ------ src/parquet/file/file-serialize-test.cc | 3 +- src/parquet/thrift.h | 6 +- src/parquet/util/test-common.h | 3 +- 29 files changed, 1559 insertions(+), 1744 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index ffda1ad..52a44e1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -609,7 +609,6 @@ endif() add_subdirectory(src/parquet) add_subdirectory(src/parquet/api) add_subdirectory(src/parquet/column) -add_subdirectory(src/parquet/encodings) add_subdirectory(src/parquet/file) add_subdirectory(src/parquet/util) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/benchmarks/decode_benchmark.cc ---------------------------------------------------------------------- diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc index d767622..5601af9 100644 --- a/benchmarks/decode_benchmark.cc +++ b/benchmarks/decode_benchmark.cc @@ -20,11 +20,7 @@ #include <stdio.h> #include "parquet/compression.h" -#include "parquet/encodings/delta-bit-pack-encoding.h" -#include "parquet/encodings/delta-byte-array-encoding.h" -#include "parquet/encodings/delta-length-byte-array-encoding.h" -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encoding-internal.h" #include "parquet/util/stopwatch.h" /** http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index f49a54d..1022095 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -18,13 +18,17 @@ # Headers: top level install(FILES compression.h + encoding.h exception.h schema.h types.h DESTINATION include/parquet) ADD_PARQUET_TEST(compression-test) +ADD_PARQUET_TEST(encoding-test) ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(types-test) ADD_PARQUET_TEST(reader-test) ADD_PARQUET_TEST(schema-test) + +ADD_PARQUET_BENCHMARK(encoding-benchmark) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 619d5a3..9a7fea9 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -308,8 +308,7 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> - TestTypes; + ::arrow::StringType, ::arrow::BinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -690,8 +689,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType> - PrimitiveTestTypes; + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, + ::arrow::DoubleType> PrimitiveTestTypes; TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index dedb2c2..19a7c49 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -244,8 +244,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio } typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - BooleanType, ByteArrayType, FLBAType> - TestTypes; + BooleanType, ByteArrayType, FLBAType> TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index 88dc670..a547fdc 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -23,9 +23,7 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" - -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encoding-internal.h" namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 7924e55..0c1e25c 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -25,9 +25,11 @@ #include <unordered_map> #include <vector> +#include <arrow/util/bit-util.h> + #include "parquet/column/levels.h" #include "parquet/column/page.h" -#include "parquet/encodings/decoder.h" +#include "parquet/encoding.h" #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index 5d137b7..fa31e62 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -145,8 +145,7 @@ static int num_pages = 20; static int batch_size = 32; typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, - ByteArrayType> - TestTypes; + ByteArrayType> TestTypes; using TestBooleanFlatScanner = TestFlatScanner<BooleanType>; using TestFLBAFlatScanner = TestFlatScanner<FLBAType>; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/statistics.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc index 7d6aebb..897287e 100644 --- a/src/parquet/column/statistics.cc +++ b/src/parquet/column/statistics.cc @@ -19,7 +19,7 @@ #include <cstring> #include "parquet/column/statistics.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encoding-internal.h" #include "parquet/exception.h" #include "parquet/util/comparison.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index f0580f5..12b818d 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -32,10 +32,7 @@ #include "parquet/column/levels.h" #include "parquet/column/page.h" - -// Depended on by SerializedPageReader test utilities for now -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encoding-internal.h" #include "parquet/util/memory.h" #include "parquet/util/test-common.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 315c42f..7c85905 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -19,8 +19,8 @@ #include "parquet/column/properties.h" #include "parquet/column/statistics.h" -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encoding-internal.h" +#include "parquet/util/logging.h" #include "parquet/util/memory.h" namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 6ab84b2..3ee8fd6 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -24,7 +24,7 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" #include "parquet/column/statistics.h" -#include "parquet/encodings/encoder.h" +#include "parquet/encoding.h" #include "parquet/file/metadata.h" #include "parquet/schema.h" #include "parquet/types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encoding-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc new file mode 100644 index 0000000..cf65dd9 --- /dev/null +++ b/src/parquet/encoding-benchmark.cc @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "parquet/encoding-internal.h" +#include "parquet/file/reader-internal.h" +#include "parquet/util/memory.h" + +namespace parquet { + +using format::ColumnChunk; +using schema::PrimitiveNode; + +namespace benchmark { + +std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) { + auto node = PrimitiveNode::Make("int64", repetition, Type::INT64); + return std::make_shared<ColumnDescriptor>( + node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED); +} + +static void BM_PlainEncodingBoolean(::benchmark::State& state) { + std::vector<bool> values(state.range_x(), 64); + PlainEncoder<BooleanType> encoder(nullptr); + + while (state.KeepRunning()) { + encoder.Put(values, values.size()); + encoder.FlushValues(); + } + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(bool)); +} + +BENCHMARK(BM_PlainEncodingBoolean)->Range(1024, 65536); + +static void BM_PlainDecodingBoolean(::benchmark::State& state) { + std::vector<bool> values(state.range_x(), 64); + bool* output = new bool[state.range_x()]; + PlainEncoder<BooleanType> encoder(nullptr); + encoder.Put(values, values.size()); + std::shared_ptr<Buffer> buf = encoder.FlushValues(); + + while (state.KeepRunning()) { + PlainDecoder<BooleanType> decoder(nullptr); + decoder.SetData(values.size(), buf->data(), buf->size()); + decoder.Decode(output, values.size()); + } + + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(bool)); + delete[] output; +} + +BENCHMARK(BM_PlainDecodingBoolean)->Range(1024, 65536); + +static void BM_PlainEncodingInt64(::benchmark::State& state) { + std::vector<int64_t> values(state.range_x(), 64); + PlainEncoder<Int64Type> encoder(nullptr); + + while (state.KeepRunning()) { + encoder.Put(values.data(), values.size()); + encoder.FlushValues(); + } + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(int64_t)); +} + +BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536); + +static void BM_PlainDecodingInt64(::benchmark::State& state) { + std::vector<int64_t> values(state.range_x(), 64); + PlainEncoder<Int64Type> encoder(nullptr); + encoder.Put(values.data(), values.size()); + std::shared_ptr<Buffer> buf = encoder.FlushValues(); + + while (state.KeepRunning()) { + PlainDecoder<Int64Type> decoder(nullptr); + decoder.SetData(values.size(), buf->data(), buf->size()); + decoder.Decode(values.data(), values.size()); + } + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(int64_t)); +} + +BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536); + +template <typename Type> +static void DecodeDict( + std::vector<typename Type::c_type>& values, ::benchmark::State& state) { + typedef typename Type::c_type T; + int num_values = values.size(); + + ChunkedAllocator pool; + MemoryAllocator* allocator = default_allocator(); + std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED); + + DictEncoder<Type> encoder(descr.get(), &pool, allocator); + for (int i = 0; i < num_values; ++i) { + encoder.Put(values[i]); + } + + std::shared_ptr<PoolBuffer> dict_buffer = + AllocateBuffer(allocator, encoder.dict_encoded_size()); + + std::shared_ptr<PoolBuffer> indices = + AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize()); + + encoder.WriteDict(dict_buffer->mutable_data()); + int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size()); + + PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes)); + + while (state.KeepRunning()) { + PlainDecoder<Type> dict_decoder(descr.get()); + dict_decoder.SetData(encoder.num_entries(), dict_buffer->data(), dict_buffer->size()); + DictionaryDecoder<Type> decoder(descr.get()); + decoder.SetDict(&dict_decoder); + decoder.SetData(num_values, indices->data(), indices->size()); + decoder.Decode(values.data(), num_values); + } + + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(T)); +} + +static void BM_DictDecodingInt64_repeats(::benchmark::State& state) { + typedef Int64Type Type; + typedef typename Type::c_type T; + + std::vector<T> values(state.range_x(), 64); + DecodeDict<Type>(values, state); +} + +BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536); + +static void BM_DictDecodingInt64_literals(::benchmark::State& state) { + typedef Int64Type Type; + typedef typename Type::c_type T; + + std::vector<T> values(state.range_x()); + for (size_t i = 0; i < values.size(); ++i) { + values[i] = i; + } + DecodeDict<Type>(values, state); +} + +BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536); + +} // namespace benchmark + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encoding-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h new file mode 100644 index 0000000..ad4a78f --- /dev/null +++ b/src/parquet/encoding-internal.h @@ -0,0 +1,929 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCODING_INTERNAL_H +#define PARQUET_ENCODING_INTERNAL_H + +#include <algorithm> +#include <cstdint> +#include <limits> +#include <memory> +#include <vector> + +#include <arrow/util/bit-util.h> + +#include "parquet/exception.h" +#include "parquet/encoding.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/bit-stream-utils.inline.h" +#include "parquet/util/cpu-info.h" +#include "parquet/util/hash-util.h" +#include "parquet/util/memory.h" +#include "parquet/util/rle-encoding.h" + +namespace parquet { + +class ColumnDescriptor; + +// ---------------------------------------------------------------------- +// Encoding::PLAIN decoder implementation + +template <typename DType> +class PlainDecoder : public Decoder<DType> { + public: + typedef typename DType::c_type T; + using Decoder<DType>::num_values_; + + explicit PlainDecoder(const ColumnDescriptor* descr) + : Decoder<DType>(descr, Encoding::PLAIN), data_(NULL), len_(0) { + if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { + type_length_ = descr_->type_length(); + } else { + type_length_ = -1; + } + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + data_ = data; + len_ = len; + } + + virtual int Decode(T* buffer, int max_values); + + private: + using Decoder<DType>::descr_; + const uint8_t* data_; + int len_; + int type_length_; +}; + +// Decode routine templated on C++ type rather than type enum +template <typename T> +inline int DecodePlain( + const uint8_t* data, int64_t data_size, int num_values, int type_length, T* out) { + int bytes_to_decode = num_values * sizeof(T); + if (data_size < bytes_to_decode) { ParquetException::EofException(); } + memcpy(out, data, bytes_to_decode); + return bytes_to_decode; +} + +// Template specialization for BYTE_ARRAY. The written values do not own their +// own data. +template <> +inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values, + int type_length, ByteArray* out) { + int bytes_decoded = 0; + int increment; + for (int i = 0; i < num_values; ++i) { + uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data); + increment = sizeof(uint32_t) + len; + if (data_size < increment) ParquetException::EofException(); + out[i].ptr = data + sizeof(uint32_t); + data += increment; + data_size -= increment; + bytes_decoded += increment; + } + return bytes_decoded; +} + +// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not +// own their own data. +template <> +inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size, + int num_values, int type_length, FixedLenByteArray* out) { + int bytes_to_decode = type_length * num_values; + if (data_size < bytes_to_decode) { ParquetException::EofException(); } + for (int i = 0; i < num_values; ++i) { + out[i].ptr = data; + data += type_length; + data_size -= type_length; + } + return bytes_to_decode; +} + +template <typename DType> +inline int PlainDecoder<DType>::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer); + data_ += bytes_consumed; + len_ -= bytes_consumed; + num_values_ -= max_values; + return max_values; +} + +template <> +class PlainDecoder<BooleanType> : public Decoder<BooleanType> { + public: + explicit PlainDecoder(const ColumnDescriptor* descr) + : Decoder<BooleanType>(descr, Encoding::PLAIN) {} + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + bit_reader_ = BitReader(data, len); + } + + // Two flavors of bool decoding + int Decode(uint8_t* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + bool val; + for (int i = 0; i < max_values; ++i) { + if (!bit_reader_.GetValue(1, &val)) { ParquetException::EofException(); } + BitUtil::SetArrayBit(buffer, i, val); + } + num_values_ -= max_values; + return max_values; + } + + virtual int Decode(bool* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) { + ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + private: + BitReader bit_reader_; +}; + +// ---------------------------------------------------------------------- +// Encoding::PLAIN encoder implementation + +template <typename DType> +class PlainEncoder : public Encoder<DType> { + public: + typedef typename DType::c_type T; + + explicit PlainEncoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Encoder<DType>(descr, Encoding::PLAIN, allocator) { + values_sink_.reset(new InMemoryOutputStream(allocator)); + } + + int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } + + std::shared_ptr<Buffer> FlushValues() override; + void Put(const T* src, int num_values) override; + + protected: + std::unique_ptr<InMemoryOutputStream> values_sink_; +}; + +template <> +class PlainEncoder<BooleanType> : public Encoder<BooleanType> { + public: + explicit PlainEncoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator), + bits_available_(kInMemoryDefaultCapacity * 8), + bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)), + values_sink_(new InMemoryOutputStream(allocator)) { + bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size())); + } + + int64_t EstimatedDataEncodedSize() override { + return values_sink_->Tell() + bit_writer_->bytes_written(); + } + + std::shared_ptr<Buffer> FlushValues() override { + if (bits_available_ > 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bit_writer_->Clear(); + bits_available_ = bits_buffer_->size() * 8; + } + + std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); + values_sink_.reset(new InMemoryOutputStream(this->allocator_)); + return buffer; + } + +#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \ + void Put(input_type src, int num_values) function_attributes { \ + int bit_offset = 0; \ + if (bits_available_ > 0) { \ + int bits_to_write = std::min(bits_available_, num_values); \ + for (int i = 0; i < bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bits_available_ -= bits_to_write; \ + bit_offset = bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ + \ + int bits_remaining = num_values - bit_offset; \ + while (bit_offset < num_values) { \ + bits_available_ = bits_buffer_->size() * 8; \ + \ + int bits_to_write = std::min(bits_available_, bits_remaining); \ + for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bit_offset += bits_to_write; \ + bits_available_ -= bits_to_write; \ + bits_remaining -= bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ + } + + PLAINDECODER_BOOLEAN_PUT(const bool*, override) + PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, ) + + protected: + int bits_available_; + std::unique_ptr<BitWriter> bit_writer_; + std::shared_ptr<PoolBuffer> bits_buffer_; + std::unique_ptr<InMemoryOutputStream> values_sink_; +}; + +template <typename DType> +inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() { + std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); + values_sink_.reset(new InMemoryOutputStream(this->allocator_)); + return buffer; +} + +template <typename DType> +inline void PlainEncoder<DType>::Put(const T* buffer, int num_values) { + values_sink_->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T)); +} + +template <> +inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) { + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + values_sink_->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t)); + if (src[i].len > 0) { DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; } + values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len); + } +} + +template <> +inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) { + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + if (descr_->type_length() > 0) { + DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL"; + } + values_sink_->Write( + reinterpret_cast<const uint8_t*>(src[i].ptr), descr_->type_length()); + } +} + +// ---------------------------------------------------------------------- +// Dictionary encoding and decoding + +template <typename Type> +class DictionaryDecoder : public Decoder<Type> { + public: + typedef typename Type::c_type T; + + // Initializes the dictionary with values from 'dictionary'. The data in + // dictionary is not guaranteed to persist in memory after this call so the + // dictionary decoder needs to copy the data out if necessary. + explicit DictionaryDecoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Decoder<Type>(descr, Encoding::RLE_DICTIONARY), + dictionary_(0, allocator), + byte_array_data_(AllocateBuffer(allocator, 0)) {} + + // Perform type-specific initiatialization + void SetDict(Decoder<Type>* dictionary); + + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + if (len == 0) return; + uint8_t bit_width = *data; + ++data; + --len; + idx_decoder_ = RleDecoder(data, len, bit_width); + } + + int Decode(T* buffer, int max_values) override { + max_values = std::min(max_values, num_values_); + int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values); + if (decoded_values != max_values) { ParquetException::EofException(); } + num_values_ -= max_values; + return max_values; + } + + int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + int decoded_values = idx_decoder_.GetBatchWithDictSpaced( + dictionary_, buffer, num_values, null_count, valid_bits, valid_bits_offset); + if (decoded_values != num_values) { ParquetException::EofException(); } + return decoded_values; + } + + private: + using Decoder<Type>::num_values_; + + // Only one is set. + Vector<T> dictionary_; + + // Data that contains the byte array data (byte_array_dictionary_ just has the + // pointers). + std::shared_ptr<PoolBuffer> byte_array_data_; + + RleDecoder idx_decoder_; +}; + +template <typename Type> +inline void DictionaryDecoder<Type>::SetDict(Decoder<Type>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); +} + +template <> +inline void DictionaryDecoder<BooleanType>::SetDict(Decoder<BooleanType>* dictionary) { + ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); +} + +template <> +inline void DictionaryDecoder<ByteArrayType>::SetDict( + Decoder<ByteArrayType>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int total_size = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + total_size += dictionary_[i].len; + } + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); + int offset = 0; + + uint8_t* bytes_data = byte_array_data_->mutable_data(); + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len); + dictionary_[i].ptr = bytes_data + offset; + offset += dictionary_[i].len; + } +} + +template <> +inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.Resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int fixed_len = descr_->type_length(); + int total_size = num_dictionary_values * fixed_len; + + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size)); + uint8_t* bytes_data = byte_array_data_->mutable_data(); + int offset = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); + dictionary_[i].ptr = bytes_data + offset; + offset += fixed_len; + } +} + +// ---------------------------------------------------------------------- +// Dictionary encoder + +// Initially imported from Apache Impala on 2016-02-22, and has been modified +// since for parquet-cpp + +// Initially 1024 elements +static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10; + +typedef int32_t hash_slot_t; +static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits<int32_t>::max(); + +// The maximum load factor for the hash table before resizing. +static constexpr double MAX_HASH_LOAD = 0.7; + +/// See the dictionary encoding section of https://github.com/Parquet/parquet-format. +/// The encoding supports streaming encoding. Values are encoded as they are added while +/// the dictionary is being constructed. At any time, the buffered values can be +/// written out with the current dictionary size. More values can then be added to +/// the encoder, including new dictionary entries. +template <typename DType> +class DictEncoder : public Encoder<DType> { + public: + typedef typename DType::c_type T; + + explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr, + MemoryAllocator* allocator = default_allocator()) + : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator), + allocator_(allocator), + pool_(pool), + hash_table_size_(INITIAL_HASH_TABLE_SIZE), + mod_bitmask_(hash_table_size_ - 1), + hash_slots_(0, allocator), + dict_encoded_size_(0), + type_length_(desc->type_length()) { + hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY); + if (!CpuInfo::initialized()) { CpuInfo::Init(); } + } + + virtual ~DictEncoder() { DCHECK(buffered_indices_.empty()); } + + // TODO(wesm): think about how to address the construction semantics in + // encodings/dictionary-encoding.h + void set_mem_pool(ChunkedAllocator* pool) { pool_ = pool; } + + void set_type_length(int type_length) { type_length_ = type_length; } + + /// Returns a conservative estimate of the number of bytes needed to encode the buffered + /// indices. Used to size the buffer passed to WriteIndices(). + int64_t EstimatedDataEncodedSize() override { + // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to + // reserve + // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used + // but not reserving them would cause the encoder to fail. + return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()) + + RleEncoder::MinBufferSize(bit_width()); + } + + /// The minimum bit width required to encode the currently buffered indices. + int bit_width() const { + if (UNLIKELY(num_entries() == 0)) return 0; + if (UNLIKELY(num_entries() == 1)) return 1; + return BitUtil::Log2(num_entries()); + } + + /// Writes out any buffered indices to buffer preceded by the bit width of this data. + /// Returns the number of bytes written. + /// If the supplied buffer is not big enough, returns -1. + /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() + /// to size buffer. + int WriteIndices(uint8_t* buffer, int buffer_len); + + int hash_table_size() { return hash_table_size_; } + int dict_encoded_size() { return dict_encoded_size_; } + /// Clears all the indices (but leaves the dictionary). + void ClearIndices() { buffered_indices_.clear(); } + + /// Encode value. Note that this does not actually write any data, just + /// buffers the value's index to be written later. + void Put(const T& value); + + std::shared_ptr<Buffer> FlushValues() override { + std::shared_ptr<PoolBuffer> buffer = + AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); + int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize()); + ClearIndices(); + PARQUET_THROW_NOT_OK(buffer->Resize(result_size)); + return buffer; + }; + + void Put(const T* values, int num_values) override { + for (int i = 0; i < num_values; i++) { + Put(values[i]); + } + } + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + INIT_BITSET(valid_bits, valid_bits_offset); + for (int32_t i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { Put(src[i]); } + READ_NEXT_BITSET(valid_bits); + } + } + + /// Writes out the encoded dictionary to buffer. buffer must be preallocated to + /// dict_encoded_size() bytes. + void WriteDict(uint8_t* buffer); + + ChunkedAllocator* mem_pool() { return pool_; } + + /// The number of entries in the dictionary. + int num_entries() const { return uniques_.size(); } + + private: + MemoryAllocator* allocator_; + + // For ByteArray / FixedLenByteArray data. Not owned + ChunkedAllocator* pool_; + + /// Size of the table. Must be a power of 2. + int hash_table_size_; + + // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j % + // hash_table_size_, but uses far fewer CPU cycles + int mod_bitmask_; + + // We use a fixed-size hash table with linear probing + // + // These values correspond to the uniques_ array + Vector<hash_slot_t> hash_slots_; + + /// Indices that have not yet be written out by WriteIndices(). + std::vector<int> buffered_indices_; + + /// The number of bytes needed to encode the dictionary. + int dict_encoded_size_; + + // The unique observed values + std::vector<T> uniques_; + + bool SlotDifferent(const T& v, hash_slot_t slot); + void DoubleTableSize(); + + /// Size of each encoded dictionary value. -1 for variable-length types. + int type_length_; + + /// Hash function for mapping a value to a bucket. + inline int Hash(const T& value) const; + + /// Adds value to the hash table and updates dict_encoded_size_ + void AddDictKey(const T& value); +}; + +template <typename DType> +inline int DictEncoder<DType>::Hash(const typename DType::c_type& value) const { + return HashUtil::Hash(&value, sizeof(value), 0); +} + +template <> +inline int DictEncoder<ByteArrayType>::Hash(const ByteArray& value) const { + if (value.len > 0) { DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; } + return HashUtil::Hash(value.ptr, value.len, 0); +} + +template <> +inline int DictEncoder<FLBAType>::Hash(const FixedLenByteArray& value) const { + if (type_length_ > 0) { DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; } + return HashUtil::Hash(value.ptr, type_length_, 0); +} + +template <typename DType> +inline bool DictEncoder<DType>::SlotDifferent( + const typename DType::c_type& v, hash_slot_t slot) { + return v != uniques_[slot]; +} + +template <> +inline bool DictEncoder<FLBAType>::SlotDifferent( + const FixedLenByteArray& v, hash_slot_t slot) { + return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_); +} + +template <typename DType> +inline void DictEncoder<DType>::Put(const typename DType::c_type& v) { + int j = Hash(v) & mod_bitmask_; + hash_slot_t index = hash_slots_[j]; + + // Find an empty slot + while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) { + // Linear probing + ++j; + if (j == hash_table_size_) j = 0; + index = hash_slots_[j]; + } + + if (index == HASH_SLOT_EMPTY) { + // Not in the hash table, so we insert it now + index = uniques_.size(); + hash_slots_[j] = index; + AddDictKey(v); + + if (UNLIKELY(static_cast<int>(uniques_.size()) > hash_table_size_ * MAX_HASH_LOAD)) { + DoubleTableSize(); + } + } + + buffered_indices_.push_back(index); +} + +template <typename DType> +inline void DictEncoder<DType>::DoubleTableSize() { + int new_size = hash_table_size_ * 2; + Vector<hash_slot_t> new_hash_slots(0, allocator_); + new_hash_slots.Assign(new_size, HASH_SLOT_EMPTY); + hash_slot_t index, slot; + int j; + for (int i = 0; i < hash_table_size_; ++i) { + index = hash_slots_[i]; + + if (index == HASH_SLOT_EMPTY) { continue; } + + // Compute the hash value mod the new table size to start looking for an + // empty slot + const typename DType::c_type& v = uniques_[index]; + + // Find an empty slot in the new hash table + j = Hash(v) & (new_size - 1); + slot = new_hash_slots[j]; + while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) { + ++j; + if (j == new_size) j = 0; + slot = new_hash_slots[j]; + } + + // Copy the old slot index to the new hash table + new_hash_slots[j] = index; + } + + hash_table_size_ = new_size; + mod_bitmask_ = new_size - 1; + + hash_slots_.Swap(new_hash_slots); +} + +template <typename DType> +inline void DictEncoder<DType>::AddDictKey(const typename DType::c_type& v) { + uniques_.push_back(v); + dict_encoded_size_ += sizeof(typename DType::c_type); +} + +template <> +inline void DictEncoder<ByteArrayType>::AddDictKey(const ByteArray& v) { + uint8_t* heap = pool_->Allocate(v.len); + if (UNLIKELY(v.len > 0 && heap == nullptr)) { throw ParquetException("out of memory"); } + memcpy(heap, v.ptr, v.len); + uniques_.push_back(ByteArray(v.len, heap)); + dict_encoded_size_ += v.len + sizeof(uint32_t); +} + +template <> +inline void DictEncoder<FLBAType>::AddDictKey(const FixedLenByteArray& v) { + uint8_t* heap = pool_->Allocate(type_length_); + if (UNLIKELY(type_length_ > 0 && heap == nullptr)) { + throw ParquetException("out of memory"); + } + memcpy(heap, v.ptr, type_length_); + + uniques_.push_back(FixedLenByteArray(heap)); + dict_encoded_size_ += type_length_; +} + +template <typename DType> +inline void DictEncoder<DType>::WriteDict(uint8_t* buffer) { + // For primitive types, only a memcpy + memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size()); +} + +template <> +inline void DictEncoder<BooleanType>::WriteDict(uint8_t* buffer) { + // For primitive types, only a memcpy + // memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size()); + for (size_t i = 0; i < uniques_.size(); i++) { + buffer[i] = uniques_[i]; + } +} + +// ByteArray and FLBA already have the dictionary encoded in their data heaps +template <> +inline void DictEncoder<ByteArrayType>::WriteDict(uint8_t* buffer) { + for (const ByteArray& v : uniques_) { + memcpy(buffer, reinterpret_cast<const void*>(&v.len), sizeof(uint32_t)); + buffer += sizeof(uint32_t); + if (v.len > 0) { DCHECK(nullptr != v.ptr) << "Value ptr cannot be NULL"; } + memcpy(buffer, v.ptr, v.len); + buffer += v.len; + } +} + +template <> +inline void DictEncoder<FLBAType>::WriteDict(uint8_t* buffer) { + for (const FixedLenByteArray& v : uniques_) { + if (type_length_ > 0) { DCHECK(nullptr != v.ptr) << "Value ptr cannot be NULL"; } + memcpy(buffer, v.ptr, type_length_); + buffer += type_length_; + } +} + +template <typename DType> +inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) { + // Write bit width in first byte + *buffer = bit_width(); + ++buffer; + --buffer_len; + + RleEncoder encoder(buffer, buffer_len, bit_width()); + for (int index : buffered_indices_) { + if (!encoder.Put(index)) return -1; + } + encoder.Flush(); + + ClearIndices(); + return 1 + encoder.len(); +} + +// ---------------------------------------------------------------------- +// DeltaBitPackDecoder + +template <typename DType> +class DeltaBitPackDecoder : public Decoder<DType> { + public: + typedef typename DType::c_type T; + + explicit DeltaBitPackDecoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), + delta_bit_widths_(new PoolBuffer(allocator)) { + if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { + throw ParquetException("Delta bit pack encoding should only be for integer data."); + } + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + decoder_ = BitReader(data, len); + values_current_block_ = 0; + values_current_mini_block_ = 0; + } + + virtual int Decode(T* buffer, int max_values) { + return GetInternal(buffer, max_values); + } + + private: + using Decoder<DType>::num_values_; + + void InitBlock() { + int32_t block_size; + if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); } + if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); + PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_)); + + uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); + + if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); + for (int i = 0; i < num_mini_blocks_; ++i) { + if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) { + ParquetException::EofException(); + } + } + values_per_mini_block_ = block_size / num_mini_blocks_; + mini_block_idx_ = 0; + delta_bit_width_ = bit_width_data[0]; + values_current_mini_block_ = values_per_mini_block_; + } + + template <typename T> + int GetInternal(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + const uint8_t* bit_width_data = delta_bit_widths_->data(); + for (int i = 0; i < max_values; ++i) { + if (UNLIKELY(values_current_mini_block_ == 0)) { + ++mini_block_idx_; + if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) { + delta_bit_width_ = bit_width_data[mini_block_idx_]; + values_current_mini_block_ = values_per_mini_block_; + } else { + InitBlock(); + buffer[i] = last_value_; + continue; + } + } + + // TODO: the key to this algorithm is to decode the entire miniblock at once. + int64_t delta; + if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); + delta += min_delta_; + last_value_ += delta; + buffer[i] = last_value_; + --values_current_mini_block_; + } + num_values_ -= max_values; + return max_values; + } + + BitReader decoder_; + int32_t values_current_block_; + int32_t num_mini_blocks_; + uint64_t values_per_mini_block_; + uint64_t values_current_mini_block_; + + int32_t min_delta_; + size_t mini_block_idx_; + std::unique_ptr<PoolBuffer> delta_bit_widths_; + int delta_bit_width_; + + int32_t last_value_; +}; + +// ---------------------------------------------------------------------- +// DELTA_LENGTH_BYTE_ARRAY + +class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> { + public: + explicit DeltaLengthByteArrayDecoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), + len_decoder_(nullptr, allocator) {} + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int total_lengths_len = *reinterpret_cast<const int*>(data); + data += 4; + len_decoder_.SetData(num_values, data, total_lengths_len); + data_ = data + total_lengths_len; + len_ = len - 4 - total_lengths_len; + } + + virtual int Decode(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int lengths[max_values]; + len_decoder_.Decode(lengths, max_values); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = lengths[i]; + buffer[i].ptr = data_; + data_ += lengths[i]; + len_ -= lengths[i]; + } + num_values_ -= max_values; + return max_values; + } + + private: + using Decoder<ByteArrayType>::num_values_; + DeltaBitPackDecoder<Int32Type> len_decoder_; + const uint8_t* data_; + int len_; +}; + +// ---------------------------------------------------------------------- +// DELTA_BYTE_ARRAY + +class DeltaByteArrayDecoder : public Decoder<ByteArrayType> { + public: + explicit DeltaByteArrayDecoder( + const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) + : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY), + prefix_len_decoder_(nullptr, allocator), + suffix_decoder_(nullptr, allocator) {} + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int prefix_len_length = *reinterpret_cast<const int*>(data); + data += 4; + len -= 4; + prefix_len_decoder_.SetData(num_values, data, prefix_len_length); + data += prefix_len_length; + len -= prefix_len_length; + suffix_decoder_.SetData(num_values, data, len); + } + + // TODO: this doesn't work and requires memory management. We need to allocate + // new strings to store the results. + virtual int Decode(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + int prefix_len = 0; + prefix_len_decoder_.Decode(&prefix_len, 1); + ByteArray suffix = {0, NULL}; + suffix_decoder_.Decode(&suffix, 1); + buffer[i].len = prefix_len + suffix.len; + + uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len)); + memcpy(result, last_value_.ptr, prefix_len); + memcpy(result + prefix_len, suffix.ptr, suffix.len); + + buffer[i].ptr = result; + last_value_ = buffer[i]; + } + num_values_ -= max_values; + return max_values; + } + + private: + using Decoder<ByteArrayType>::num_values_; + + DeltaBitPackDecoder<Int32Type> prefix_len_decoder_; + DeltaLengthByteArrayDecoder suffix_decoder_; + ByteArray last_value_; +}; + +} // namespace parquet + +#endif // PARQUET_ENCODING_INTERNAL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encoding-test.cc b/src/parquet/encoding-test.cc new file mode 100644 index 0000000..8f266b4 --- /dev/null +++ b/src/parquet/encoding-test.cc @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <gtest/gtest.h> +#include <string> +#include <vector> + +#include "parquet/encoding-internal.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/memory.h" +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; + +namespace parquet { + +namespace test { + +TEST(VectorBooleanTest, TestEncodeDecode) { + // PARQUET-454 + int nvalues = 10000; + int nbytes = BitUtil::Ceil(nvalues, 8); + + // seed the prng so failure is deterministic + vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0); + + PlainEncoder<BooleanType> encoder(nullptr); + PlainDecoder<BooleanType> decoder(nullptr); + + encoder.Put(draws, nvalues); + + std::shared_ptr<Buffer> encode_buffer = encoder.FlushValues(); + ASSERT_EQ(nbytes, encode_buffer->size()); + + vector<uint8_t> decode_buffer(nbytes); + const uint8_t* decode_data = &decode_buffer[0]; + + decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size()); + int values_decoded = decoder.Decode(&decode_buffer[0], nvalues); + ASSERT_EQ(nvalues, values_decoded); + + for (int i = 0; i < nvalues; ++i) { + ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i; + } +} + +// ---------------------------------------------------------------------- +// test data generation + +template <typename T> +void GenerateData(int num_values, T* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_numbers( + num_values, 0, std::numeric_limits<T>::min(), std::numeric_limits<T>::max(), out); +} + +template <> +void GenerateData<bool>(int num_values, bool* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_bools(num_values, 0.5, 0, out); +} + +template <> +void GenerateData<Int96>(int num_values, Int96* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(), + std::numeric_limits<int32_t>::max(), out); +} + +template <> +void GenerateData<ByteArray>(int num_values, ByteArray* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + int max_byte_array_len = 12; + heap->resize(num_values * max_byte_array_len); + random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len); +} + +static int flba_length = 8; + +template <> +void GenerateData<FLBA>(int num_values, FLBA* out, vector<uint8_t>* heap) { + // seed the prng so failure is deterministic + heap->resize(num_values * flba_length); + random_fixed_byte_array(num_values, 0, heap->data(), flba_length, out); +} + +template <typename T> +void VerifyResults(T* result, T* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(expected[i], result[i]) << i; + } +} + +template <> +void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, flba_length)) << i; + } +} + +// ---------------------------------------------------------------------- +// Create some column descriptors + +template <typename DType> +std::shared_ptr<ColumnDescriptor> ExampleDescr() { + auto node = schema::PrimitiveNode::Make("name", Repetition::OPTIONAL, DType::type_num); + return std::make_shared<ColumnDescriptor>(node, 0, 0); +} + +template <> +std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBAType>() { + auto node = schema::PrimitiveNode::Make("name", Repetition::OPTIONAL, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, flba_length, 10, 2); + return std::make_shared<ColumnDescriptor>(node, 0, 0); +} + +// ---------------------------------------------------------------------- +// Plain encoding tests + +template <typename Type> +class TestEncodingBase : public ::testing::Test { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void SetUp() { + descr_ = ExampleDescr<Type>(); + type_length_ = descr_->type_length(); + allocator_ = default_allocator(); + } + + void TearDown() { pool_.FreeAll(); } + + void InitData(int nvalues, int repeats) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(T)); + output_bytes_.resize(num_values_ * sizeof(T)); + draws_ = reinterpret_cast<T*>(input_bytes_.data()); + decode_buf_ = reinterpret_cast<T*>(output_bytes_.data()); + GenerateData<T>(nvalues, draws_, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + virtual void CheckRoundtrip() = 0; + + void Execute(int nvalues, int repeats) { + InitData(nvalues, repeats); + CheckRoundtrip(); + } + + protected: + ChunkedAllocator pool_; + MemoryAllocator* allocator_; + + int num_values_; + int type_length_; + T* draws_; + T* decode_buf_; + vector<uint8_t> input_bytes_; + vector<uint8_t> output_bytes_; + vector<uint8_t> data_buffer_; + + std::shared_ptr<Buffer> encode_buffer_; + std::shared_ptr<ColumnDescriptor> descr_; +}; + +// Member variables are not visible to templated subclasses. Possibly figure +// out an alternative to this class layering at some point +#define USING_BASE_MEMBERS() \ + using TestEncodingBase<Type>::pool_; \ + using TestEncodingBase<Type>::allocator_; \ + using TestEncodingBase<Type>::descr_; \ + using TestEncodingBase<Type>::num_values_; \ + using TestEncodingBase<Type>::draws_; \ + using TestEncodingBase<Type>::data_buffer_; \ + using TestEncodingBase<Type>::type_length_; \ + using TestEncodingBase<Type>::encode_buffer_; \ + using TestEncodingBase<Type>::decode_buf_; + +template <typename Type> +class TestPlainEncoding : public TestEncodingBase<Type> { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + PlainEncoder<Type> encoder(descr_.get()); + PlainDecoder<Type> decoder(descr_.get()); + encoder.Put(draws_, num_values_); + encode_buffer_ = encoder.FlushValues(); + + decoder.SetData(num_values_, encode_buffer_->data(), encode_buffer_->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + VerifyResults<T>(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); +}; + +TYPED_TEST_CASE(TestPlainEncoding, ParquetTypes); + +TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { + this->Execute(10000, 1); +} + +// ---------------------------------------------------------------------- +// Dictionary encoding tests + +typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, + ByteArrayType, FLBAType> DictEncodedTypes; + +template <typename Type> +class TestDictionaryEncoding : public TestEncodingBase<Type> { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void CheckRoundtrip() { + std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(num_values_) + 1, 255); + DictEncoder<Type> encoder(descr_.get(), &pool_); + + ASSERT_NO_THROW(encoder.Put(draws_, num_values_)); + dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer_->mutable_data()); + std::shared_ptr<Buffer> indices = encoder.FlushValues(); + + DictEncoder<Type> spaced_encoder(descr_.get(), &pool_); + // PutSpaced should lead to the same results + ASSERT_NO_THROW(spaced_encoder.PutSpaced(draws_, num_values_, valid_bits.data(), 0)); + std::shared_ptr<Buffer> indices_from_spaced = spaced_encoder.FlushValues(); + ASSERT_TRUE(indices_from_spaced->Equals(*indices)); + + PlainDecoder<Type> dict_decoder(descr_.get()); + dict_decoder.SetData( + encoder.num_entries(), dict_buffer_->data(), dict_buffer_->size()); + + DictionaryDecoder<Type> decoder(descr_.get()); + decoder.SetDict(&dict_decoder); + + decoder.SetData(num_values_, indices->data(), indices->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + + // TODO(wesm): The DictionaryDecoder must stay alive because the decoded + // values' data is owned by a buffer inside the DictionaryEncoder. We + // should revisit when data lifetime is reviewed more generally. + VerifyResults<T>(decode_buf_, draws_, num_values_); + + // Also test spaced decoding + decoder.SetData(num_values_, indices->data(), indices->size()); + values_decoded = + decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0); + ASSERT_EQ(num_values_, values_decoded); + VerifyResults<T>(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); + std::shared_ptr<PoolBuffer> dict_buffer_; +}; + +TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes); + +TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) { + this->Execute(2500, 2); +} + +TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { + PlainDecoder<BooleanType> dict_decoder(nullptr); + DictionaryDecoder<BooleanType> decoder(nullptr); + + ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException); +} + +} // namespace test + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h new file mode 100644 index 0000000..2e2bf68 --- /dev/null +++ b/src/parquet/encoding.h @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCODING_H +#define PARQUET_ENCODING_H + +#include <cstdint> +#include <memory> + +#include <arrow/util/bit-util.h> + +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/memory.h" + +namespace parquet { + +class ColumnDescriptor; + +// Base class for value encoders. Since encoders may or not have state (e.g., +// dictionary encoding) we use a class instance to maintain any state. +// +// TODO(wesm): Encode interface API is temporary +template <typename DType> +class Encoder { + public: + typedef typename DType::c_type T; + + virtual ~Encoder() {} + + virtual int64_t EstimatedDataEncodedSize() = 0; + virtual std::shared_ptr<Buffer> FlushValues() = 0; + virtual void Put(const T* src, int num_values) = 0; + virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) { + PoolBuffer buffer(allocator_); + buffer.Resize(num_values * sizeof(T)); + int32_t num_valid_values = 0; + INIT_BITSET(valid_bits, valid_bits_offset); + T* data = reinterpret_cast<T*>(buffer.mutable_data()); + for (int32_t i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + data[num_valid_values++] = src[i]; + } + READ_NEXT_BITSET(valid_bits); + } + Put(data, num_valid_values); + } + + const Encoding::type encoding() const { return encoding_; } + + protected: + explicit Encoder(const ColumnDescriptor* descr, const Encoding::type& encoding, + MemoryAllocator* allocator) + : descr_(descr), encoding_(encoding), allocator_(allocator) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + const Encoding::type encoding_; + MemoryAllocator* allocator_; +}; + +// The Decoder template is parameterized on parquet::DataType subclasses +template <typename DType> +class Decoder { + public: + typedef typename DType::c_type T; + + virtual ~Decoder() {} + + // Sets the data for a new page. This will be called multiple times on the same + // decoder and should reset all internal state. + virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + + // Subclasses should override the ones they support. In each of these functions, + // the decoder would decode put to 'max_values', storing the result in 'buffer'. + // The function returns the number of values decoded, which should be max_values + // except for end of the current data page. + virtual int Decode(T* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + + // Decode the values in this data page but leave spaces for null entries. + // + // num_values is the size of the def_levels and buffer arrays including the number of + // null values. + virtual int DecodeSpaced(T* buffer, int num_values, int null_count, + const uint8_t* valid_bits, int64_t valid_bits_offset) { + int values_to_read = num_values - null_count; + int values_read = Decode(buffer, values_to_read); + if (values_read != values_to_read) { + throw ParquetException("Number of values / definition_levels read did not match"); + } + + // Add spacing for null entries. As we have filled the buffer from the front, + // we need to add the spacing from the back. + int values_to_move = values_read; + for (int i = num_values - 1; i >= 0; i--) { + if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { + buffer[i] = buffer[--values_to_move]; + } + } + return num_values; + } + + // Returns the number of values left (for the last call to SetData()). This is + // the number of values left in this page. + int values_left() const { return num_values_; } + + const Encoding::type encoding() const { return encoding_; } + + protected: + explicit Decoder(const ColumnDescriptor* descr, const Encoding::type& encoding) + : descr_(descr), encoding_(encoding), num_values_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + + const Encoding::type encoding_; + int num_values_; +}; + +} // namespace parquet + +#endif // PARQUET_ENCODING_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encodings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt deleted file mode 100644 index 00565b2..0000000 --- a/src/parquet/encodings/CMakeLists.txt +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Headers: encodings -install(FILES - decoder.h - encoder.h - delta-bit-pack-encoding.h - delta-byte-array-encoding.h - delta-length-byte-array-encoding.h - dictionary-encoding.h - plain-encoding.h - DESTINATION include/parquet/encodings) - -ADD_PARQUET_TEST(encoding-test) -ADD_PARQUET_BENCHMARK(encoding-benchmark) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encodings/decoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h deleted file mode 100644 index 58989e5..0000000 --- a/src/parquet/encodings/decoder.h +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_ENCODINGS_DECODER_H -#define PARQUET_ENCODINGS_DECODER_H - -#include <cstdint> - -#include <arrow/util/bit-util.h> - -#include "parquet/exception.h" -#include "parquet/types.h" -#include "parquet/util/memory.h" - -namespace parquet { - -class ColumnDescriptor; - -// The Decoder template is parameterized on parquet::DataType subclasses -template <typename DType> -class Decoder { - public: - typedef typename DType::c_type T; - - virtual ~Decoder() {} - - // Sets the data for a new page. This will be called multiple times on the same - // decoder and should reset all internal state. - virtual void SetData(int num_values, const uint8_t* data, int len) = 0; - - // Subclasses should override the ones they support. In each of these functions, - // the decoder would decode put to 'max_values', storing the result in 'buffer'. - // The function returns the number of values decoded, which should be max_values - // except for end of the current data page. - virtual int Decode(T* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - - // Decode the values in this data page but leave spaces for null entries. - // - // num_values is the size of the def_levels and buffer arrays including the number of - // null values. - virtual int DecodeSpaced(T* buffer, int num_values, int null_count, - const uint8_t* valid_bits, int64_t valid_bits_offset) { - int values_to_read = num_values - null_count; - int values_read = Decode(buffer, values_to_read); - if (values_read != values_to_read) { - throw ParquetException("Number of values / definition_levels read did not match"); - } - - // Add spacing for null entries. As we have filled the buffer from the front, - // we need to add the spacing from the back. - int values_to_move = values_read; - for (int i = num_values - 1; i >= 0; i--) { - if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { - buffer[i] = buffer[--values_to_move]; - } - } - return num_values; - } - - // Returns the number of values left (for the last call to SetData()). This is - // the number of values left in this page. - int values_left() const { return num_values_; } - - const Encoding::type encoding() const { return encoding_; } - - protected: - explicit Decoder(const ColumnDescriptor* descr, const Encoding::type& encoding) - : descr_(descr), encoding_(encoding), num_values_(0) {} - - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; - - const Encoding::type encoding_; - int num_values_; -}; - -} // namespace parquet - -#endif // PARQUET_ENCODINGS_DECODER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encodings/delta-bit-pack-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h deleted file mode 100644 index 59774a4..0000000 --- a/src/parquet/encodings/delta-bit-pack-encoding.h +++ /dev/null @@ -1,125 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H -#define PARQUET_DELTA_BIT_PACK_ENCODING_H - -#include <algorithm> -#include <cstdint> -#include <vector> - -#include "parquet/encodings/decoder.h" -#include "parquet/util/bit-stream-utils.inline.h" -#include "parquet/util/memory.h" - -namespace parquet { - -template <typename DType> -class DeltaBitPackDecoder : public Decoder<DType> { - public: - typedef typename DType::c_type T; - - explicit DeltaBitPackDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), - delta_bit_widths_(new PoolBuffer(allocator)) { - if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { - throw ParquetException("Delta bit pack encoding should only be for integer data."); - } - } - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - decoder_ = BitReader(data, len); - values_current_block_ = 0; - values_current_mini_block_ = 0; - } - - virtual int Decode(T* buffer, int max_values) { - return GetInternal(buffer, max_values); - } - - private: - using Decoder<DType>::num_values_; - - void InitBlock() { - int32_t block_size; - if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); - if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); - if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); } - if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); - PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_)); - - uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); - - if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); - for (int i = 0; i < num_mini_blocks_; ++i) { - if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) { - ParquetException::EofException(); - } - } - values_per_mini_block_ = block_size / num_mini_blocks_; - mini_block_idx_ = 0; - delta_bit_width_ = bit_width_data[0]; - values_current_mini_block_ = values_per_mini_block_; - } - - template <typename T> - int GetInternal(T* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - const uint8_t* bit_width_data = delta_bit_widths_->data(); - for (int i = 0; i < max_values; ++i) { - if (UNLIKELY(values_current_mini_block_ == 0)) { - ++mini_block_idx_; - if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) { - delta_bit_width_ = bit_width_data[mini_block_idx_]; - values_current_mini_block_ = values_per_mini_block_; - } else { - InitBlock(); - buffer[i] = last_value_; - continue; - } - } - - // TODO: the key to this algorithm is to decode the entire miniblock at once. - int64_t delta; - if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); - delta += min_delta_; - last_value_ += delta; - buffer[i] = last_value_; - --values_current_mini_block_; - } - num_values_ -= max_values; - return max_values; - } - - BitReader decoder_; - int32_t values_current_block_; - int32_t num_mini_blocks_; - uint64_t values_per_mini_block_; - uint64_t values_current_mini_block_; - - int32_t min_delta_; - size_t mini_block_idx_; - std::unique_ptr<PoolBuffer> delta_bit_widths_; - int delta_bit_width_; - - int32_t last_value_; -}; -} // namespace parquet - -#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encodings/delta-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h deleted file mode 100644 index 827789d..0000000 --- a/src/parquet/encodings/delta-byte-array-encoding.h +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H -#define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H - -#include <algorithm> - -#include "parquet/encodings/decoder.h" -#include "parquet/encodings/delta-bit-pack-encoding.h" -#include "parquet/encodings/delta-length-byte-array-encoding.h" - -namespace parquet { - -class DeltaByteArrayDecoder : public Decoder<ByteArrayType> { - public: - explicit DeltaByteArrayDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY), - prefix_len_decoder_(nullptr, allocator), - suffix_decoder_(nullptr, allocator) {} - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - if (len == 0) return; - int prefix_len_length = *reinterpret_cast<const int*>(data); - data += 4; - len -= 4; - prefix_len_decoder_.SetData(num_values, data, prefix_len_length); - data += prefix_len_length; - len -= prefix_len_length; - suffix_decoder_.SetData(num_values, data, len); - } - - // TODO: this doesn't work and requires memory management. We need to allocate - // new strings to store the results. - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - int prefix_len = 0; - prefix_len_decoder_.Decode(&prefix_len, 1); - ByteArray suffix = {0, NULL}; - suffix_decoder_.Decode(&suffix, 1); - buffer[i].len = prefix_len + suffix.len; - - uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len)); - memcpy(result, last_value_.ptr, prefix_len); - memcpy(result + prefix_len, suffix.ptr, suffix.len); - - buffer[i].ptr = result; - last_value_ = buffer[i]; - } - num_values_ -= max_values; - return max_values; - } - - private: - using Decoder<ByteArrayType>::num_values_; - - DeltaBitPackDecoder<Int32Type> prefix_len_decoder_; - DeltaLengthByteArrayDecoder suffix_decoder_; - ByteArray last_value_; -}; - -} // namespace parquet - -#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/782049ba/src/parquet/encodings/delta-length-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h deleted file mode 100644 index b159171..0000000 --- a/src/parquet/encodings/delta-length-byte-array-encoding.h +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H -#define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H - -#include <algorithm> -#include <cstdint> -#include <vector> - -#include "parquet/encodings/decoder.h" -#include "parquet/encodings/delta-bit-pack-encoding.h" - -namespace parquet { - -class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> { - public: - explicit DeltaLengthByteArrayDecoder( - const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) - : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), - len_decoder_(nullptr, allocator) {} - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - if (len == 0) return; - int total_lengths_len = *reinterpret_cast<const int*>(data); - data += 4; - len_decoder_.SetData(num_values, data, total_lengths_len); - data_ = data + total_lengths_len; - len_ = len - 4 - total_lengths_len; - } - - virtual int Decode(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int lengths[max_values]; - len_decoder_.Decode(lengths, max_values); - for (int i = 0; i < max_values; ++i) { - buffer[i].len = lengths[i]; - buffer[i].ptr = data_; - data_ += lengths[i]; - len_ -= lengths[i]; - } - num_values_ -= max_values; - return max_values; - } - - private: - using Decoder<ByteArrayType>::num_values_; - DeltaBitPackDecoder<Int32Type> len_decoder_; - const uint8_t* data_; - int len_; -}; - -} // namespace parquet - -#endif
