Repository: parquet-cpp Updated Branches: refs/heads/master 5af54bec2 -> 89f5a5439
PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types This PR adds support for INT96 and FIXED_LEN_BYTE_ARRAY types. It modifies the examples and DebugPrint to handle these types. Author: Deepak Majeti <[email protected]> Closes #27 from majetideepak/master and squashes the following commits: 5ba0a03 [Deepak Majeti] PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/89f5a543 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/89f5a543 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/89f5a543 Branch: refs/heads/master Commit: 89f5a543929b7d2bb3249626fb03cf9130540439 Parents: 5af54be Author: Deepak Majeti <[email protected]> Authored: Thu Jan 28 17:38:27 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Thu Jan 28 17:38:27 2016 -0800 ---------------------------------------------------------------------- src/parquet/column_reader.cc | 2 ++ src/parquet/column_reader.h | 1 + src/parquet/encodings/dictionary-encoding.h | 19 ++++++++++++ src/parquet/encodings/plain-encoding.h | 16 ++++++++++ src/parquet/reader.cc | 38 ++++++++++++++++-------- src/parquet/types.h | 38 ++++++++++++++++++++++-- src/parquet/util/compiler-util.h | 21 +++++++++++++ 7 files changed, 120 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/column_reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc index 168bc94..4f565cf 100644 --- a/src/parquet/column_reader.cc +++ b/src/parquet/column_reader.cc @@ -187,6 +187,8 @@ std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* return std::make_shared<DoubleReader>(metadata, element, stream); case Type::BYTE_ARRAY: return std::make_shared<ByteArrayReader>(metadata, element, stream); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream); default: ParquetException::NYI("type reader not implemented"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/column_reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h index cd6cc02..00722f5 100644 --- a/src/parquet/column_reader.h +++ b/src/parquet/column_reader.h @@ -159,6 +159,7 @@ typedef TypedColumnReader<parquet::Type::INT96> Int96Reader; typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader; typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader; typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader; +typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader; template <int TYPE> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index a4ef342..9641e7e 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -111,6 +111,25 @@ inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init( } } +template <> +inline void DictionaryDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Init( + Decoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int fixed_len = schema_->type_length; + int total_size = num_dictionary_values*fixed_len; + + byte_array_data_.resize(total_size); + int offset = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len); + dictionary_[i].ptr = &byte_array_data_[offset]; + offset += fixed_len; + } +} + } // namespace parquet_cpp #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index 4ddf878..dc71e39 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -74,6 +74,22 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer, return max_values; } +// Template specialization for FIXED_LEN_BYTE_ARRAY +template <> +inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(FixedLenByteArray* buffer, + int max_values) { + max_values = std::min(max_values, num_values_); + int len = schema_->type_length; + for (int i = 0; i < max_values; ++i) { + if (len_ < len) ParquetException::EofException(); + buffer[i].ptr = data_; + data_ += len; + len_ -= len; + } + num_values_ -= max_values; + return max_values; +} + template <> class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> { public: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc index 7c727ba..4654f14 100644 --- a/src/parquet/reader.cc +++ b/src/parquet/reader.cc @@ -217,6 +217,9 @@ static string parquet_type_to_string(Type::type t) { case Type::INT64: return "INT64"; break; + case Type::INT96: + return "INT96"; + break; case Type::FLOAT: return "FLOAT"; break; @@ -226,9 +229,6 @@ static string parquet_type_to_string(Type::type t) { case Type::BYTE_ARRAY: return "BYTE_ARRAY"; break; - case Type::INT96: - return "INT96"; - break; case Type::FIXED_LEN_BYTE_ARRAY: return "FIXED_LEN_BYTE_ARRAY"; break; @@ -239,7 +239,7 @@ static string parquet_type_to_string(Type::type t) { } // the fixed initial size is just for an example -#define COL_WIDTH "17" +#define COL_WIDTH "20" void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { if (!parsed_metadata_) { @@ -251,10 +251,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { for (int c = 1; c < metadata_.schema.size(); ++c) { stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " (" << parquet_type_to_string(metadata_.schema[c].type); - if (metadata_.schema[c].type == Type::INT96 || - metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) { - stream << " - not supported"; - } stream << ")\n"; } @@ -291,10 +287,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str()); - if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) { - continue; - } - // This is OK in this method as long as the RowGroupReader does not get deleted readers[c] = col_reader; } @@ -345,6 +337,16 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { } break; } + case Type::INT96: { + Int96 val = reinterpret_cast<Int96Reader*>(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + string result = Int96ToString(val); + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str()); + stream << buffer; + } + break; + } case Type::FLOAT: { float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue( &def_level[c], &rep_level[c]); @@ -373,7 +375,17 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { } break; } - default: + case Type::FIXED_LEN_BYTE_ARRAY: { + FixedLenByteArray val = reinterpret_cast<FixedLenByteArrayReader*>( + readers[c])->NextValue(&def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + string result = FixedLenByteArrayToString(val, metadata_.schema[c+1].type_length); + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str()); + stream << buffer; + } + break; + } + default: continue; } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/types.h ---------------------------------------------------------------------- diff --git a/src/parquet/types.h b/src/parquet/types.h index 37f538a..409e335 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -22,8 +22,10 @@ #include <cstdint> #include <cstring> #include <string> +#include <sstream> #include "parquet/thrift/parquet_types.h" +#include "parquet/util/compiler-util.h" namespace parquet_cpp { @@ -32,11 +34,36 @@ struct ByteArray { const uint8_t* ptr; }; +struct FixedLenByteArray { + const uint8_t* ptr; +}; + +MANUALLY_ALIGNED_STRUCT(1) Int96 { + uint32_t value[3]; +}; +STRUCT_END(Int96, 12); static inline std::string ByteArrayToString(const ByteArray& a) { return std::string(reinterpret_cast<const char*>(a.ptr), a.len); } +static inline std::string Int96ToString(const Int96& a) { + std::stringstream result; + for (int i = 0; i < 3; i++) { + result << a.value[i] << " "; + } + return result.str(); +} + +static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) { + const uint8_t *bytes = reinterpret_cast<const uint8_t*>(a.ptr); + std::stringstream result; + for (int i = 0; i < len; i++) { + result << (uint32_t)bytes[i] << " "; + } + return result.str(); +} + static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) { int len = std::min(x1.len, x2.len); int cmp = memcmp(x1.ptr, x2.ptr, len); @@ -76,8 +103,7 @@ struct type_traits<parquet::Type::INT64> { template <> struct type_traits<parquet::Type::INT96> { - // TODO - typedef void* value_type; + typedef Int96 value_type; static constexpr parquet::Type::type parquet_type = parquet::Type::INT96; static constexpr size_t value_byte_size = 12; @@ -107,6 +133,14 @@ struct type_traits<parquet::Type::BYTE_ARRAY> { static constexpr size_t value_byte_size = sizeof(ByteArray); }; +template <> +struct type_traits<parquet::Type::FIXED_LEN_BYTE_ARRAY> { + typedef FixedLenByteArray value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::FIXED_LEN_BYTE_ARRAY; + + static constexpr size_t value_byte_size = sizeof(FixedLenByteArray); +}; + } // namespace parquet_cpp #endif // PARQUET_TYPES_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/util/compiler-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/compiler-util.h b/src/parquet/util/compiler-util.h index bd0dbed..9048ba1 100644 --- a/src/parquet/util/compiler-util.h +++ b/src/parquet/util/compiler-util.h @@ -36,4 +36,25 @@ #define PREFETCH(addr) __builtin_prefetch(addr) +//macros to disable padding +//these macros are portable across different compilers and platforms +//[https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flatbuffers.h#L1355] +#if defined(_MSC_VER) + #define MANUALLY_ALIGNED_STRUCT(alignment) \ + __pragma(pack(1)); \ + struct __declspec(align(alignment)) + #define STRUCT_END(name, size) \ + __pragma(pack()); \ + static_assert(sizeof(name) == size, "compiler breaks packing rules") +#elif defined(__GNUC__) || defined(__clang__) + #define MANUALLY_ALIGNED_STRUCT(alignment) \ + _Pragma("pack(1)") \ + struct __attribute__((aligned(alignment))) + #define STRUCT_END(name, size) \ + _Pragma("pack()") \ + static_assert(sizeof(name) == size, "compiler breaks packing rules") +#else + #error Unknown compiler, please define structure alignment macros +#endif + #endif // PARQUET_UTIL_COMPILER_UTIL_H
