http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/impala/rle-encoding.h ---------------------------------------------------------------------- diff --git a/src/impala/rle-encoding.h b/src/impala/rle-encoding.h deleted file mode 100644 index 759f917..0000000 --- a/src/impala/rle-encoding.h +++ /dev/null @@ -1,417 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef IMPALA_RLE_ENCODING_H -#define IMPALA_RLE_ENCODING_H - -#include <math.h> - -#include "impala/compiler-util.h" -#include "impala/bit-stream-utils.inline.h" -#include "impala/bit-util.h" -#include "impala/logging.h" - -namespace impala { - -// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs -// are sufficiently long, RLE is used, otherwise, the values are just bit-packed -// (literal encoding). -// For both types of runs, there is a byte-aligned indicator which encodes the length -// of the run and the type of the run. -// This encoding has the benefit that when there aren't any long enough runs, values -// are always decoded at fixed (can be precomputed) bit offsets OR both the value and -// the run length are byte aligned. This allows for very efficient decoding -// implementations. -// The encoding is: -// encoded-block := run* -// run := literal-run | repeated-run -// literal-run := literal-indicator < literal bytes > -// repeated-run := repeated-indicator < repeated value. padded to byte boundary > -// literal-indicator := varint_encode( number_of_groups << 1 | 1) -// repeated-indicator := varint_encode( number_of_repetitions << 1 ) -// -// Each run is preceded by a varint. The varint's least significant bit is -// used to indicate whether the run is a literal run or a repeated run. The rest -// of the varint is used to determine the length of the run (eg how many times the -// value repeats). -// -// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode -// in groups of 8), so that no matter the bit-width of the value, the sequence will end -// on a byte boundary without padding. -// Given that we know it is a multiple of 8, we store the number of 8-groups rather than -// the actual number of encoded ints. (This means that the total number of encoded values -// can not be determined from the encoded data, since the number of values in the last -// group may not be a multiple of 8). For the last group of literal runs, we pad -// the group to 8 with zeros. This allows for 8 at a time decoding on the read side -// without the need for additional checks. -// -// There is a break-even point when it is more storage efficient to do run length -// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes -// for both the repeated encoding or the literal encoding. This value can always -// be computed based on the bit-width. -// TODO: think about how to use this for strings. The bit packing isn't quite the same. -// -// Examples with bit-width 1 (eg encoding booleans): -// ---------------------------------------- -// 100 1s followed by 100 0s: -// <varint(100 << 1)> <1, padded to 1 byte> Â <varint(100 << 1)> <0, padded to 1 byte> Â -// - (total 4 bytes) -// -// alternating 1s and 0s (200 total): -// 200 ints = 25 groups of 8 -// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> Â -// (total 26 bytes, 1 byte overhead) -// - -// Decoder class for RLE encoded data. -class RleDecoder { - public: - // Create a decoder object. buffer/buffer_len is the decoded data. - // bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - DCHECK_GE(bit_width_, 0); - DCHECK_LE(bit_width_, 64); - } - - RleDecoder() {} - - // Gets the next value. Returns false if there are no more. - template<typename T> - bool Get(T* val); - - private: - BitReader bit_reader_; - int bit_width_; - uint64_t current_value_; - uint32_t repeat_count_; - uint32_t literal_count_; -}; - -// Class to incrementally build the rle data. This class does not allocate any memory. -// The encoding has two modes: encoding repeated runs and literal runs. -// If the run is sufficiently short, it is more efficient to encode as a literal run. -// This class does so by buffering 8 values at a time. If they are not all the same -// they are added to the literal run. If they are the same, they are added to the -// repeated run. When we switch modes, the previous run is flushed out. -class RleEncoder { - public: - // buffer/buffer_len: preallocated output buffer. - // bit_width: max number of bits for value. - // TODO: consider adding a min_repeated_run_length so the caller can control - // when values should be encoded as repeated runs. Currently this is derived - // based on the bit_width, which can determine a storage optimal choice. - // TODO: allow 0 bit_width (and have dict encoder use it) - RleEncoder(uint8_t* buffer, int buffer_len, int bit_width) - : bit_width_(bit_width), - bit_writer_(buffer, buffer_len) { - DCHECK_GE(bit_width_, 1); - DCHECK_LE(bit_width_, 64); - max_run_byte_size_ = MinBufferSize(bit_width); - DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough."; - Clear(); - } - - // Returns the minimum buffer size needed to use the encoder for 'bit_width' - // This is the maximum length of a single run for 'bit_width'. - // It is not valid to pass a buffer less than this length. - static int MinBufferSize(int bit_width) { - // 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values. - int max_literal_run_size = 1 + - BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8); - // Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value. - int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8); - return std::max(max_literal_run_size, max_repeated_run_size); - } - - // Returns the maximum byte size it could take to encode 'num_values'. - static int MaxBufferSize(int bit_width, int num_values) { - int bytes_per_run = BitUtil::Ceil(bit_width * MAX_VALUES_PER_LITERAL_RUN, 8.0); - int num_runs = BitUtil::Ceil(num_values, MAX_VALUES_PER_LITERAL_RUN); - int literal_max_size = num_runs + num_runs * bytes_per_run; - int min_run_size = MinBufferSize(bit_width); - return std::max(min_run_size, literal_max_size) + min_run_size; - } - - // Encode value. Returns true if the value fits in buffer, false otherwise. - // This value must be representable with bit_width_ bits. - bool Put(uint64_t value); - - // Flushes any pending values to the underlying buffer. - // Returns the total number of bytes written - int Flush(); - - // Resets all the state in the encoder. - void Clear(); - - // Returns pointer to underlying buffer - uint8_t* buffer() { return bit_writer_.buffer(); } - int32_t len() { return bit_writer_.bytes_written(); } - - private: - // Flushes any buffered values. If this is part of a repeated run, this is largely - // a no-op. - // If it is part of a literal run, this will call FlushLiteralRun, which writes - // out the buffered literal values. - // If 'done' is true, the current run would be written even if it would normally - // have been buffered more. This should only be called at the end, when the - // encoder has received all values even if it would normally continue to be - // buffered. - void FlushBufferedValues(bool done); - - // Flushes literal values to the underlying buffer. If update_indicator_byte, - // then the current literal run is complete and the indicator byte is updated. - void FlushLiteralRun(bool update_indicator_byte); - - // Flushes a repeated run to the underlying buffer. - void FlushRepeatedRun(); - - // Checks and sets buffer_full_. This must be called after flushing a run to - // make sure there are enough bytes remaining to encode the next run. - void CheckBufferFull(); - - // The maximum number of values in a single literal run - // (number of groups encodable by a 1-byte indicator * 8) - static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8; - - // Number of bits needed to encode the value. - const int bit_width_; - - // Underlying buffer. - BitWriter bit_writer_; - - // If true, the buffer is full and subsequent Put()'s will fail. - bool buffer_full_; - - // The maximum byte size a single run can take. - int max_run_byte_size_; - - // We need to buffer at most 8 values for literals. This happens when the - // bit_width is 1 (so 8 values fit in one byte). - // TODO: generalize this to other bit widths - int64_t buffered_values_[8]; - - // Number of values in buffered_values_ - int num_buffered_values_; - - // The current (also last) value that was written and the count of how - // many times in a row that value has been seen. This is maintained even - // if we are in a literal run. If the repeat_count_ get high enough, we switch - // to encoding repeated runs. - int64_t current_value_; - int repeat_count_; - - // Number of literals in the current run. This does not include the literals - // that might be in buffered_values_. Only after we've got a group big enough - // can we decide if they should part of the literal_count_ or repeat_count_ - int literal_count_; - - // Pointer to a byte in the underlying buffer that stores the indicator byte. - // This is reserved as soon as we need a literal run but the value is written - // when the literal run is complete. - uint8_t* literal_indicator_byte_; -}; - -template<typename T> -inline bool RleDecoder::Get(T* val) { - if (UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)) { - // Read the next run's indicator int, it could be a literal or repeated run - // The int is encoded as a vlq-encoded value. - uint64_t indicator_value = 0; - bool result = bit_reader_.GetVlqInt(&indicator_value); - if (!result) return false; - - // lsb indicates if it is a literal run or repeated run - bool is_literal = indicator_value & 1; - if (is_literal) { - literal_count_ = (indicator_value >> 1) * 8; - } else { - repeat_count_ = indicator_value >> 1; - bool result = bit_reader_.GetAligned<T>( - BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_)); - DCHECK(result); - } - } - - if (LIKELY(repeat_count_ > 0)) { - *val = current_value_; - --repeat_count_; - } else { - DCHECK(literal_count_ > 0); - bool result = bit_reader_.GetValue(bit_width_, val); - DCHECK(result); - --literal_count_; - } - - return true; -} - -// This function buffers input values 8 at a time. After seeing all 8 values, -// it decides whether they should be encoded as a literal or repeated run. -inline bool RleEncoder::Put(uint64_t value) { - DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); - if (UNLIKELY(buffer_full_)) return false; - - if (LIKELY(current_value_ == value)) { - ++repeat_count_; - if (repeat_count_ > 8) { - // This is just a continuation of the current run, no need to buffer the - // values. - // Note that this is the fast path for long repeated runs. - return true; - } - } else { - if (repeat_count_ >= 8) { - // We had a run that was long enough but it has ended. Flush the - // current repeated run. - DCHECK_EQ(literal_count_, 0); - FlushRepeatedRun(); - } - repeat_count_ = 1; - current_value_ = value; - } - - buffered_values_[num_buffered_values_] = value; - if (++num_buffered_values_ == 8) { - DCHECK_EQ(literal_count_ % 8, 0); - FlushBufferedValues(false); - } - return true; -} - -inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) { - if (literal_indicator_byte_ == NULL) { - // The literal indicator byte has not been reserved yet, get one now. - literal_indicator_byte_ = bit_writer_.GetNextBytePtr(); - DCHECK(literal_indicator_byte_ != NULL); - } - - // Write all the buffered values as bit packed literals - for (int i = 0; i < num_buffered_values_; ++i) { - bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_); - DCHECK(success) << "There is a bug in using CheckBufferFull()"; - } - num_buffered_values_ = 0; - - if (update_indicator_byte) { - // At this point we need to write the indicator byte for the literal run. - // We only reserve one byte, to allow for streaming writes of literal values. - // The logic makes sure we flush literal runs often enough to not overrun - // the 1 byte. - DCHECK_EQ(literal_count_ % 8, 0); - int num_groups = literal_count_ / 8; - int32_t indicator_value = (num_groups << 1) | 1; - DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); - *literal_indicator_byte_ = indicator_value; - literal_indicator_byte_ = NULL; - literal_count_ = 0; - CheckBufferFull(); - } -} - -inline void RleEncoder::FlushRepeatedRun() { - DCHECK_GT(repeat_count_, 0); - bool result = true; - // The lsb of 0 indicates this is a repeated run - int32_t indicator_value = repeat_count_ << 1 | 0; - result &= bit_writer_.PutVlqInt(indicator_value); - result &= bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); - DCHECK(result); - num_buffered_values_ = 0; - repeat_count_ = 0; - CheckBufferFull(); -} - -// Flush the values that have been buffered. At this point we decide whether -// we need to switch between the run types or continue the current one. -inline void RleEncoder::FlushBufferedValues(bool done) { - if (repeat_count_ >= 8) { - // Clear the buffered values. They are part of the repeated run now and we - // don't want to flush them out as literals. - num_buffered_values_ = 0; - if (literal_count_ != 0) { - // There was a current literal run. All the values in it have been flushed - // but we still need to update the indicator byte. - DCHECK_EQ(literal_count_ % 8, 0); - DCHECK_EQ(repeat_count_, 8); - FlushLiteralRun(true); - } - DCHECK_EQ(literal_count_, 0); - return; - } - - literal_count_ += num_buffered_values_; - DCHECK_EQ(literal_count_ % 8, 0); - int num_groups = literal_count_ / 8; - if (num_groups + 1 >= (1 << 6)) { - // We need to start a new literal run because the indicator byte we've reserved - // cannot store more values. - DCHECK(literal_indicator_byte_ != NULL); - FlushLiteralRun(true); - } else { - FlushLiteralRun(done); - } - repeat_count_ = 0; -} - -inline int RleEncoder::Flush() { - if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { - bool all_repeat = literal_count_ == 0 && - (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); - // There is something pending, figure out if it's a repeated or literal run - if (repeat_count_ > 0 && all_repeat) { - FlushRepeatedRun(); - } else { - DCHECK_EQ(literal_count_ % 8, 0); - // Buffer the last group of literals to 8 by padding with 0s. - for (; num_buffered_values_ != 0 && num_buffered_values_ < 8; - ++num_buffered_values_) { - buffered_values_[num_buffered_values_] = 0; - } - literal_count_ += num_buffered_values_; - FlushLiteralRun(true); - repeat_count_ = 0; - } - } - bit_writer_.Flush(); - DCHECK_EQ(num_buffered_values_, 0); - DCHECK_EQ(literal_count_, 0); - DCHECK_EQ(repeat_count_, 0); - - return bit_writer_.bytes_written(); -} - -inline void RleEncoder::CheckBufferFull() { - int bytes_written = bit_writer_.bytes_written(); - if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) { - buffer_full_ = true; - } -} - -inline void RleEncoder::Clear() { - buffer_full_ = false; - current_value_ = 0; - repeat_count_ = 0; - num_buffered_values_ = 0; - literal_count_ = 0; - literal_indicator_byte_ = NULL; - bit_writer_.Clear(); -} - -} -#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet.cc ---------------------------------------------------------------------- diff --git a/src/parquet.cc b/src/parquet.cc index 6c939ae..f71d32b 100644 --- a/src/parquet.cc +++ b/src/parquet.cc @@ -13,9 +13,10 @@ // limitations under the License. #include "parquet/parquet.h" -#include "encodings/encodings.h" -#include "compression/codec.h" +#include "parquet/encodings/encodings.h" +#include "parquet/compression/codec.h" +#include <algorithm> #include <string> #include <string.h> @@ -23,18 +24,21 @@ const int DATA_PAGE_SIZE = 64 * 1024; -using namespace boost; -using namespace parquet; -using namespace std; - namespace parquet_cpp { +using parquet::CompressionCodec; +using parquet::Encoding; +using parquet::FieldRepetitionType; +using parquet::PageType; +using parquet::SchemaElement; +using parquet::Type; + InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : buffer_(buffer), len_(len), offset_(0) { } const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { - *num_bytes = ::min(static_cast<int64_t>(num_to_peek), len_ - offset_); + *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); return buffer_ + offset_; } @@ -47,7 +51,7 @@ const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { ColumnReader::~ColumnReader() { } -ColumnReader::ColumnReader(const ColumnMetaData* metadata, +ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, const SchemaElement* schema, InputStream* stream) : metadata_(metadata), schema_(schema), @@ -96,7 +100,7 @@ ColumnReader::ColumnReader(const ColumnMetaData* metadata, void ColumnReader::BatchDecode() { buffered_values_offset_ = 0; - uint8_t* buf= &values_buffer_[0]; + uint8_t* buf = &values_buffer_[0]; int batch_size = config_.batch_size; switch (metadata_->type) { case parquet::Type::BOOLEAN: @@ -164,7 +168,7 @@ bool ColumnReader::ReadNewPage() { } if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - boost::unordered_map<Encoding::type, boost::shared_ptr<Decoder> >::iterator it = + std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it = decoders_.find(Encoding::RLE_DICTIONARY); if (it != decoders_.end()) { throw ParquetException("Column cannot have more than one dictionary."); @@ -173,7 +177,7 @@ bool ColumnReader::ReadNewPage() { PlainDecoder dictionary(schema_->type); dictionary.SetData(current_page_header_.dictionary_page_header.num_values, buffer, uncompressed_len); - boost::shared_ptr<Decoder> decoder( + std::shared_ptr<Decoder> decoder( new DictionaryDecoder(schema_->type, &dictionary)); decoders_[Encoding::RLE_DICTIONARY] = decoder; current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); @@ -187,7 +191,7 @@ bool ColumnReader::ReadNewPage() { int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer); buffer += sizeof(uint32_t); definition_level_decoder_.reset( - new impala::RleDecoder(buffer, num_definition_bytes, 1)); + new RleDecoder(buffer, num_definition_bytes, 1)); buffer += num_definition_bytes; uncompressed_len -= sizeof(uint32_t); uncompressed_len -= num_definition_bytes; @@ -200,14 +204,14 @@ bool ColumnReader::ReadNewPage() { Encoding::type encoding = current_page_header_.data_page_header.encoding; if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; - boost::unordered_map<Encoding::type, boost::shared_ptr<Decoder> >::iterator it = + std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it = decoders_.find(encoding); if (it != decoders_.end()) { current_decoder_ = it->second.get(); } else { switch (encoding) { case Encoding::PLAIN: { - boost::shared_ptr<Decoder> decoder; + std::shared_ptr<Decoder> decoder; if (schema_->type == Type::BOOLEAN) { decoder.reset(new BoolDecoder()); } else { @@ -239,5 +243,4 @@ bool ColumnReader::ReadNewPage() { return true; } -} - +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt new file mode 100644 index 0000000..11eaeb6 --- /dev/null +++ b/src/parquet/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright 2015 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Headers: top level +install(FILES + parquet.h + DESTINATION include/parquet) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt new file mode 100644 index 0000000..291ef03 --- /dev/null +++ b/src/parquet/compression/CMakeLists.txt @@ -0,0 +1,30 @@ +# Copyright 2012 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(parquet_compression STATIC + lz4-codec.cc + snappy-codec.cc +) +target_link_libraries(parquet_compression + lz4static + snappystatic) + +set_target_properties(parquet_compression + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}") + +# Headers: compression +install(FILES + codec.h + DESTINATION include/parquet/compression) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/codec.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h new file mode 100644 index 0000000..8166847 --- /dev/null +++ b/src/parquet/compression/codec.h @@ -0,0 +1,71 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_COMPRESSION_CODEC_H +#define PARQUET_COMPRESSION_CODEC_H + +#include "parquet/parquet.h" + +#include <cstdint> +#include "parquet/thrift/parquet_constants.h" +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +class Codec { + public: + virtual ~Codec() {} + virtual void Decompress(int input_len, const uint8_t* input, + int output_len, uint8_t* output_buffer) = 0; + + virtual int Compress(int input_len, const uint8_t* input, + int output_buffer_len, uint8_t* output_buffer) = 0; + + virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0; + + virtual const char* name() const = 0; +}; + + +// Snappy codec. +class SnappyCodec : public Codec { + public: + virtual void Decompress(int input_len, const uint8_t* input, + int output_len, uint8_t* output_buffer); + + virtual int Compress(int input_len, const uint8_t* input, + int output_buffer_len, uint8_t* output_buffer); + + virtual int MaxCompressedLen(int input_len, const uint8_t* input); + + virtual const char* name() const { return "snappy"; } +}; + +// Lz4 codec. +class Lz4Codec : public Codec { + public: + virtual void Decompress(int input_len, const uint8_t* input, + int output_len, uint8_t* output_buffer); + + virtual int Compress(int input_len, const uint8_t* input, + int output_buffer_len, uint8_t* output_buffer); + + virtual int MaxCompressedLen(int input_len, const uint8_t* input); + + virtual const char* name() const { return "lz4"; } +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/lz4-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc new file mode 100644 index 0000000..6655387 --- /dev/null +++ b/src/parquet/compression/lz4-codec.cc @@ -0,0 +1,40 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet/compression/codec.h" + +#include <lz4.h> + +namespace parquet_cpp { + +void Lz4Codec::Decompress(int input_len, const uint8_t* input, + int output_len, uint8_t* output_buffer) { + int n = LZ4_uncompress(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(output_buffer), output_len); + if (n != input_len) { + throw parquet_cpp::ParquetException("Corrupt lz4 compressed data."); + } +} + +int Lz4Codec::MaxCompressedLen(int input_len, const uint8_t* input) { + return LZ4_compressBound(input_len); +} + +int Lz4Codec::Compress(int input_len, const uint8_t* input, + int output_buffer_len, uint8_t* output_buffer) { + return LZ4_compress(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(output_buffer), input_len); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/snappy-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc new file mode 100644 index 0000000..0633d47 --- /dev/null +++ b/src/parquet/compression/snappy-codec.cc @@ -0,0 +1,42 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet/compression/codec.h" + +#include <snappy.h> + +namespace parquet_cpp { + +void SnappyCodec::Decompress(int input_len, const uint8_t* input, + int output_len, uint8_t* output_buffer) { + if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), + static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) { + throw parquet_cpp::ParquetException("Corrupt snappy compressed data."); + } +} + +int SnappyCodec::MaxCompressedLen(int input_len, const uint8_t* input) { + return snappy::MaxCompressedLength(input_len); +} + +int SnappyCodec::Compress(int input_len, const uint8_t* input, + int output_buffer_len, uint8_t* output_buffer) { + size_t output_len; + snappy::RawCompress(reinterpret_cast<const char*>(input), + static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer), + &output_len); + return output_len; +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt new file mode 100644 index 0000000..72baf48 --- /dev/null +++ b/src/parquet/encodings/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright 2015 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Headers: encodings +install(FILES + encodings.h + bool-encoding.h + delta-bit-pack-encoding.h + delta-byte-array-encoding.h + delta-length-byte-array-encoding.h + dictionary-encoding.h + plain-encoding.h + DESTINATION include/parquet/encodings) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/bool-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/bool-encoding.h b/src/parquet/encodings/bool-encoding.h new file mode 100644 index 0000000..8eb55bc --- /dev/null +++ b/src/parquet/encodings/bool-encoding.h @@ -0,0 +1,48 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_BOOL_ENCODING_H +#define PARQUET_BOOL_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> + +namespace parquet_cpp { + +class BoolDecoder : public Decoder { + public: + BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + decoder_ = RleDecoder(data, len, 1); + } + + virtual int GetBool(bool* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + if (!decoder_.Get(&buffer[i])) ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + private: + RleDecoder decoder_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-bit-pack-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h new file mode 100644 index 0000000..77a3b26 --- /dev/null +++ b/src/parquet/encodings/delta-bit-pack-encoding.h @@ -0,0 +1,116 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H +#define PARQUET_DELTA_BIT_PACK_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> +#include <vector> + +namespace parquet_cpp { + +class DeltaBitPackDecoder : public Decoder { + public: + explicit DeltaBitPackDecoder(const parquet::Type::type& type) + : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) { + if (type != parquet::Type::INT32 && type != parquet::Type::INT64) { + throw ParquetException("Delta bit pack encoding should only be for integer data."); + } + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + decoder_ = BitReader(data, len); + values_current_block_ = 0; + values_current_mini_block_ = 0; + } + + virtual int GetInt32(int32_t* buffer, int max_values) { + return GetInternal(buffer, max_values); + } + + virtual int GetInt64(int64_t* buffer, int max_values) { + return GetInternal(buffer, max_values); + } + + private: + void InitBlock() { + uint64_t block_size; + if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); + if (!decoder_.GetVlqInt(&values_current_block_)) { + ParquetException::EofException(); + } + if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); + delta_bit_widths_.resize(num_mini_blocks_); + + if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); + for (int i = 0; i < num_mini_blocks_; ++i) { + if (!decoder_.GetAligned<uint8_t>(1, &delta_bit_widths_[i])) { + ParquetException::EofException(); + } + } + values_per_mini_block_ = block_size / num_mini_blocks_; + mini_block_idx_ = 0; + delta_bit_width_ = delta_bit_widths_[0]; + values_current_mini_block_ = values_per_mini_block_; + } + + template <typename T> + int GetInternal(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + if (UNLIKELY(values_current_mini_block_ == 0)) { + ++mini_block_idx_; + if (mini_block_idx_ < delta_bit_widths_.size()) { + delta_bit_width_ = delta_bit_widths_[mini_block_idx_]; + values_current_mini_block_ = values_per_mini_block_; + } else { + InitBlock(); + buffer[i] = last_value_; + continue; + } + } + + // TODO: the key to this algorithm is to decode the entire miniblock at once. + int64_t delta; + if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); + delta += min_delta_; + last_value_ += delta; + buffer[i] = last_value_; + --values_current_mini_block_; + } + num_values_ -= max_values; + return max_values; + } + + BitReader decoder_; + uint64_t values_current_block_; + uint64_t num_mini_blocks_; + uint64_t values_per_mini_block_; + uint64_t values_current_mini_block_; + + int64_t min_delta_; + int mini_block_idx_; + std::vector<uint8_t> delta_bit_widths_; + int delta_bit_width_; + + int64_t last_value_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h new file mode 100644 index 0000000..3396586 --- /dev/null +++ b/src/parquet/encodings/delta-byte-array-encoding.h @@ -0,0 +1,74 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H +#define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> + +namespace parquet_cpp { + +class DeltaByteArrayDecoder : public Decoder { + public: + DeltaByteArrayDecoder() + : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY), + prefix_len_decoder_(parquet::Type::INT32), + suffix_decoder_() { + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int prefix_len_length = *reinterpret_cast<const int*>(data); + data += 4; + len -= 4; + prefix_len_decoder_.SetData(num_values, data, prefix_len_length); + data += prefix_len_length; + len -= prefix_len_length; + suffix_decoder_.SetData(num_values, data, len); + } + + // TODO: this doesn't work and requires memory management. We need to allocate + // new strings to store the results. + virtual int GetByteArray(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + int prefix_len = 0; + prefix_len_decoder_.GetInt32(&prefix_len, 1); + ByteArray suffix; + suffix_decoder_.GetByteArray(&suffix, 1); + buffer[i].len = prefix_len + suffix.len; + + uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len)); + memcpy(result, last_value_.ptr, prefix_len); + memcpy(result + prefix_len, suffix.ptr, suffix.len); + + buffer[i].ptr = result; + last_value_ = buffer[i]; + } + num_values_ -= max_values; + return max_values; + } + + private: + DeltaBitPackDecoder prefix_len_decoder_; + DeltaLengthByteArrayDecoder suffix_decoder_; + ByteArray last_value_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-length-byte-array-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h new file mode 100644 index 0000000..06bf39d --- /dev/null +++ b/src/parquet/encodings/delta-length-byte-array-encoding.h @@ -0,0 +1,63 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H +#define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> + +namespace parquet_cpp { + +class DeltaLengthByteArrayDecoder : public Decoder { + public: + DeltaLengthByteArrayDecoder() + : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), + len_decoder_(parquet::Type::INT32) { + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + int total_lengths_len = *reinterpret_cast<const int*>(data); + data += 4; + len_decoder_.SetData(num_values, data, total_lengths_len); + data_ = data + total_lengths_len; + len_ = len - 4 - total_lengths_len; + } + + virtual int GetByteArray(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int lengths[max_values]; + len_decoder_.GetInt32(lengths, max_values); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = lengths[i]; + buffer[i].ptr = data_; + data_ += lengths[i]; + len_ -= lengths[i]; + } + num_values_ -= max_values; + return max_values; + } + + private: + DeltaBitPackDecoder len_decoder_; + const uint8_t* data_; + int len_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h new file mode 100644 index 0000000..2501b2a --- /dev/null +++ b/src/parquet/encodings/dictionary-encoding.h @@ -0,0 +1,148 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_DICTIONARY_ENCODING_H +#define PARQUET_DICTIONARY_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> +#include <vector> + +namespace parquet_cpp { + +class DictionaryDecoder : public Decoder { + public: + // Initializes the dictionary with values from 'dictionary'. The data in dictionary + // is not guaranteed to persist in memory after this call so the dictionary decoder + // needs to copy the data out if necessary. + DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary) + : Decoder(type, parquet::Encoding::RLE_DICTIONARY) { + int num_dictionary_values = dictionary->values_left(); + switch (type) { + case parquet::Type::BOOLEAN: + throw ParquetException("Boolean cols should not be dictionary encoded."); + + case parquet::Type::INT32: + int32_dictionary_.resize(num_dictionary_values); + dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values); + break; + case parquet::Type::INT64: + int64_dictionary_.resize(num_dictionary_values); + dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values); + break; + case parquet::Type::FLOAT: + float_dictionary_.resize(num_dictionary_values); + dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values); + break; + case parquet::Type::DOUBLE: + double_dictionary_.resize(num_dictionary_values); + dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values); + break; + case parquet::Type::BYTE_ARRAY: { + byte_array_dictionary_.resize(num_dictionary_values); + dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values); + int total_size = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + total_size += byte_array_dictionary_[i].len; + } + byte_array_data_.resize(total_size); + int offset = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(&byte_array_data_[offset], + byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len); + byte_array_dictionary_[i].ptr = &byte_array_data_[offset]; + offset += byte_array_dictionary_[i].len; + } + break; + } + default: + ParquetException::NYI("Unsupported dictionary type"); + } + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + if (len == 0) return; + uint8_t bit_width = *data; + ++data; + --len; + idx_decoder_ = RleDecoder(data, len, bit_width); + } + + virtual int GetInt32(int32_t* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i] = int32_dictionary_[index()]; + } + return max_values; + } + + virtual int GetInt64(int64_t* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i] = int64_dictionary_[index()]; + } + return max_values; + } + + virtual int GetFloat(float* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i] = float_dictionary_[index()]; + } + return max_values; + } + + virtual int GetDouble(double* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i] = double_dictionary_[index()]; + } + return max_values; + } + + virtual int GetByteArray(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i] = byte_array_dictionary_[index()]; + } + return max_values; + } + + private: + int index() { + int idx = 0; + if (!idx_decoder_.Get(&idx)) ParquetException::EofException(); + --num_values_; + return idx; + } + + // Only one is set. + std::vector<int32_t> int32_dictionary_; + std::vector<int64_t> int64_dictionary_; + std::vector<float> float_dictionary_; + std::vector<double> double_dictionary_; + std::vector<ByteArray> byte_array_dictionary_; + + // Data that contains the byte array data (byte_array_dictionary_ just has the + // pointers). + std::vector<uint8_t> byte_array_data_; + + RleDecoder idx_decoder_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/encodings.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h new file mode 100644 index 0000000..9211bf8 --- /dev/null +++ b/src/parquet/encodings/encodings.h @@ -0,0 +1,82 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_ENCODINGS_ENCODINGS_H +#define PARQUET_ENCODINGS_ENCODINGS_H + +#include <cstdint> + +#include "parquet/thrift/parquet_constants.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/util/rle-encoding.h" +#include "parquet/util/bit-stream-utils.inline.h" + +namespace parquet_cpp { + +class Decoder { + public: + virtual ~Decoder() {} + + // Sets the data for a new page. This will be called multiple times on the same + // decoder and should reset all internal state. + virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + + // Subclasses should override the ones they support. In each of these functions, + // the decoder would decode put to 'max_values', storing the result in 'buffer'. + // The function returns the number of values decoded, which should be max_values + // except for end of the current data page. + virtual int GetBool(bool* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + virtual int GetInt32(int32_t* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + virtual int GetInt64(int64_t* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + virtual int GetFloat(float* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + virtual int GetDouble(double* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + virtual int GetByteArray(ByteArray* buffer, int max_values) { + throw ParquetException("Decoder does not implement this type."); + } + + // Returns the number of values left (for the last call to SetData()). This is + // the number of values left in this page. + int values_left() const { return num_values_; } + + const parquet::Encoding::type encoding() const { return encoding_; } + + protected: + Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding) + : type_(type), encoding_(encoding), num_values_(0) {} + + const parquet::Type::type type_; + const parquet::Encoding::type encoding_; + int num_values_; +}; + +} // namespace parquet_cpp + +#include "parquet/encodings/bool-encoding.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" +#include "parquet/encodings/delta-length-byte-array-encoding.h" +#include "parquet/encodings/delta-byte-array-encoding.h" + +#endif // PARQUET_ENCODINGS_ENCODINGS_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h new file mode 100644 index 0000000..b094cdb --- /dev/null +++ b/src/parquet/encodings/plain-encoding.h @@ -0,0 +1,83 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef PARQUET_PLAIN_ENCODING_H +#define PARQUET_PLAIN_ENCODING_H + +#include "parquet/encodings/encodings.h" + +#include <algorithm> + +namespace parquet_cpp { + +class PlainDecoder : public Decoder { + public: + explicit PlainDecoder(const parquet::Type::type& type) + : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) { + } + + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + data_ = data; + len_ = len; + } + + int GetValues(void* buffer, int max_values, int byte_size) { + max_values = std::min(max_values, num_values_); + int size = max_values * byte_size; + if (len_ < size) ParquetException::EofException(); + memcpy(buffer, data_, size); + data_ += size; + len_ -= size; + num_values_ -= max_values; + return max_values; + } + + virtual int GetInt32(int32_t* buffer, int max_values) { + return GetValues(buffer, max_values, sizeof(int32_t)); + } + + virtual int GetInt64(int64_t* buffer, int max_values) { + return GetValues(buffer, max_values, sizeof(int64_t)); + } + + virtual int GetFloat(float* buffer, int max_values) { + return GetValues(buffer, max_values, sizeof(float)); + } + + virtual int GetDouble(double* buffer, int max_values) { + return GetValues(buffer, max_values, sizeof(double)); + } + + virtual int GetByteArray(ByteArray* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = *reinterpret_cast<const uint32_t*>(data_); + if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException(); + buffer[i].ptr = data_ + sizeof(uint32_t); + data_ += sizeof(uint32_t) + buffer[i].len; + len_ -= sizeof(uint32_t) + buffer[i].len; + } + num_values_ -= max_values; + return max_values; + } + + private: + const uint8_t* data_; + int len_; +}; + +} // namespace parquet_cpp + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/parquet.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index c1a73b7..320f003 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -17,14 +17,18 @@ #include <exception> #include <sstream> -#include <boost/cstdint.hpp> -#include <boost/scoped_ptr.hpp> +#include <cstdint> +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +// Needed for thrift #include <boost/shared_ptr.hpp> -#include <boost/unordered_map.hpp> -#include "gen-cpp/parquet_constants.h" -#include "gen-cpp/parquet_types.h" -#include "impala/rle-encoding.h" +#include "parquet/thrift/parquet_constants.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/util/rle-encoding.h" // TCompactProtocol requires some #defines to work right. #define SIGNED_RIGHT_SHIFT_IS 1 @@ -36,6 +40,17 @@ #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TBufferTransports.h> +namespace std { + +template <> +struct hash<parquet::Encoding::type> { + std::size_t operator()(const parquet::Encoding::type& k) const { + return hash<int>()(static_cast<int>(k)); + } +}; + +} // namespace std + namespace parquet_cpp { class Codec; @@ -146,18 +161,18 @@ class ColumnReader { InputStream* stream_; // Compression codec to use. - boost::scoped_ptr<Codec> decompressor_; + std::unique_ptr<Codec> decompressor_; std::vector<uint8_t> decompression_buffer_; // Map of compression type to decompressor object. - boost::unordered_map<parquet::Encoding::type, boost::shared_ptr<Decoder> > decoders_; + std::unordered_map<parquet::Encoding::type, std::shared_ptr<Decoder> > decoders_; parquet::PageHeader current_page_header_; // Not set if field is required. - boost::scoped_ptr<impala::RleDecoder> definition_level_decoder_; + std::unique_ptr<RleDecoder> definition_level_decoder_; // Not set for flat schemas. - boost::scoped_ptr<impala::RleDecoder> repetition_level_decoder_; + std::unique_ptr<RleDecoder> repetition_level_decoder_; Decoder* current_decoder_; int num_buffered_values_; @@ -241,7 +256,6 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali *len = *len - bytes_left; } -} +} // namespace parquet_cpp #endif - http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/CMakeLists.txt b/src/parquet/thrift/CMakeLists.txt new file mode 100644 index 0000000..e2a00c9 --- /dev/null +++ b/src/parquet/thrift/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright 2012 Cloudera Inc. + +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(parquet_thrift STATIC + parquet_constants.cpp + parquet_types.cpp +) +set_target_properties(parquet_thrift + PROPERTIES + LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}") + + +# Headers: thrift +install(FILES + parquet_types.h + parquet_constants.h + DESTINATION include/parquet/thrift) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/parquet_constants.cpp ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/parquet_constants.cpp b/src/parquet/thrift/parquet_constants.cpp new file mode 100644 index 0000000..caa5af6 --- /dev/null +++ b/src/parquet/thrift/parquet_constants.cpp @@ -0,0 +1,17 @@ +/** + * Autogenerated by Thrift Compiler (0.9.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "parquet_constants.h" + +namespace parquet { + +const parquetConstants g_parquet_constants; + +parquetConstants::parquetConstants() { +} + +} // namespace + http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/parquet_constants.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/parquet_constants.h b/src/parquet/thrift/parquet_constants.h new file mode 100644 index 0000000..71d6f58 --- /dev/null +++ b/src/parquet/thrift/parquet_constants.h @@ -0,0 +1,24 @@ +/** + * Autogenerated by Thrift Compiler (0.9.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef parquet_CONSTANTS_H +#define parquet_CONSTANTS_H + +#include "parquet_types.h" + +namespace parquet { + +class parquetConstants { + public: + parquetConstants(); + +}; + +extern const parquetConstants g_parquet_constants; + +} // namespace + +#endif
