PARQUET-844: Schema, compression consolidation / flattening Will look at `encodings/`, `file/`, and `column/` directories later
Author: Wes McKinney <[email protected]> Closes #228 from wesm/PARQUET-844 and squashes the following commits: 45b2887 [Wes McKinney] Fix include rename 88f0afe [Wes McKinney] Consolidate schema code and tests into schema.h/schema-internal.h 0385381 [Wes McKinney] Consolidate compression code into a single header Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/13da51d3 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/13da51d3 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/13da51d3 Branch: refs/heads/master Commit: 13da51d3f3a4da1417beddf409eba90ce3657a68 Parents: 257e65b Author: Wes McKinney <[email protected]> Authored: Thu Jan 26 12:02:42 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Thu Jan 26 12:02:42 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 12 +- benchmarks/decode_benchmark.cc | 2 +- src/parquet/CMakeLists.txt | 4 + src/parquet/api/schema.h | 4 +- src/parquet/column/column-reader-test.cc | 3 +- src/parquet/column/properties.h | 2 +- src/parquet/column/reader.h | 2 +- src/parquet/column/scanner-test.cc | 3 +- src/parquet/column/scanner.h | 2 +- src/parquet/column/statistics-test.cc | 2 +- src/parquet/column/statistics.h | 2 +- src/parquet/column/writer.h | 2 +- src/parquet/compression-test.cc | 84 +++ src/parquet/compression.cc | 259 ++++++++ src/parquet/compression.h | 125 ++++ src/parquet/compression/CMakeLists.txt | 23 - src/parquet/compression/brotli-codec.cc | 53 -- src/parquet/compression/codec-test.cc | 84 --- src/parquet/compression/codec.cc | 50 -- src/parquet/compression/codec.h | 125 ---- src/parquet/compression/gzip-codec.cc | 175 ------ src/parquet/compression/snappy-codec.cc | 48 -- src/parquet/encodings/encoding-test.cc | 3 +- src/parquet/encodings/plain-encoding.h | 2 +- src/parquet/file/file-deserialize-test.cc | 2 +- src/parquet/file/file-metadata-test.cc | 3 +- src/parquet/file/metadata.cc | 3 +- src/parquet/file/metadata.h | 4 +- src/parquet/file/reader-internal.cc | 6 +- src/parquet/file/reader-internal.h | 2 +- src/parquet/file/reader.h | 2 +- src/parquet/file/writer-internal.cc | 3 +- src/parquet/file/writer-internal.h | 2 +- src/parquet/file/writer.h | 3 +- src/parquet/schema-internal.h | 83 +++ src/parquet/schema-test.cc | 703 ++++++++++++++++++++++ src/parquet/schema.cc | 655 ++++++++++++++++++++ src/parquet/schema.h | 405 +++++++++++++ src/parquet/schema/CMakeLists.txt | 28 - src/parquet/schema/converter.cc | 124 ---- src/parquet/schema/converter.h | 91 --- src/parquet/schema/descriptor.cc | 138 ----- src/parquet/schema/descriptor.h | 142 ----- src/parquet/schema/printer.cc | 159 ----- src/parquet/schema/printer.h | 40 -- src/parquet/schema/schema-converter-test.cc | 222 ------- src/parquet/schema/schema-descriptor-test.cc | 190 ------ src/parquet/schema/schema-printer-test.cc | 76 --- src/parquet/schema/schema-types-test.cc | 311 ---------- src/parquet/schema/test-util.h | 63 -- src/parquet/schema/types.cc | 315 ---------- src/parquet/schema/types.h | 292 --------- src/parquet/util/comparison-test.cc | 2 +- src/parquet/util/comparison.h | 2 +- 54 files changed, 2348 insertions(+), 2794 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index ac9d515..8ff1421 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -486,10 +486,7 @@ set(LIBPARQUET_SRCS src/parquet/column/scan-all.cc src/parquet/column/statistics.cc - src/parquet/compression/codec.cc - src/parquet/compression/brotli-codec.cc - src/parquet/compression/snappy-codec.cc - src/parquet/compression/gzip-codec.cc + src/parquet/compression.cc src/parquet/file/metadata.cc src/parquet/file/reader.cc @@ -497,10 +494,7 @@ set(LIBPARQUET_SRCS src/parquet/file/writer.cc src/parquet/file/writer-internal.cc - src/parquet/schema/converter.cc - src/parquet/schema/descriptor.cc - src/parquet/schema/printer.cc - src/parquet/schema/types.cc + src/parquet/schema.cc src/parquet/util/cpu-info.cc src/parquet/util/memory.cc @@ -578,10 +572,8 @@ endif() add_subdirectory(src/parquet) add_subdirectory(src/parquet/api) add_subdirectory(src/parquet/column) -add_subdirectory(src/parquet/compression) add_subdirectory(src/parquet/encodings) add_subdirectory(src/parquet/file) -add_subdirectory(src/parquet/schema) add_subdirectory(src/parquet/thrift) add_subdirectory(src/parquet/util) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/benchmarks/decode_benchmark.cc ---------------------------------------------------------------------- diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc index 7659234..d767622 100644 --- a/benchmarks/decode_benchmark.cc +++ b/benchmarks/decode_benchmark.cc @@ -19,7 +19,7 @@ #include <random> #include <stdio.h> -#include "parquet/compression/codec.h" +#include "parquet/compression.h" #include "parquet/encodings/delta-bit-pack-encoding.h" #include "parquet/encodings/delta-byte-array-encoding.h" #include "parquet/encodings/delta-length-byte-array-encoding.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index a2ebbad..f49a54d 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -17,10 +17,14 @@ # Headers: top level install(FILES + compression.h exception.h + schema.h types.h DESTINATION include/parquet) +ADD_PARQUET_TEST(compression-test) ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(types-test) ADD_PARQUET_TEST(reader-test) +ADD_PARQUET_TEST(schema-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/api/schema.h ---------------------------------------------------------------------- diff --git a/src/parquet/api/schema.h b/src/parquet/api/schema.h index 523d046..2e6c3b3 100644 --- a/src/parquet/api/schema.h +++ b/src/parquet/api/schema.h @@ -19,8 +19,6 @@ #define PARQUET_API_SCHEMA_H // Schemas -#include "parquet/schema/descriptor.h" -#include "parquet/schema/printer.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #endif // PARQUET_API_SCHEMA_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 5cf3084..d410b5f 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -28,8 +28,7 @@ #include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/test-util.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/test-common.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/properties.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index cf89226..b9b0bf3 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -23,7 +23,7 @@ #include <unordered_map> #include "parquet/exception.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 3b6a971..90c0761 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -29,7 +29,7 @@ #include "parquet/column/page.h" #include "parquet/encodings/decoder.h" #include "parquet/exception.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index 8eee191..5d137b7 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -28,8 +28,7 @@ #include "parquet/column/scanner.h" #include "parquet/column/test-specialization.h" #include "parquet/column/test-util.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/test-common.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/scanner.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 13fb01b..17340b7 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -27,7 +27,7 @@ #include "parquet/column/reader.h" #include "parquet/exception.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/statistics-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc index 38cdc23..5eb7625 100644 --- a/src/parquet/column/statistics-test.cc +++ b/src/parquet/column/statistics-test.cc @@ -31,7 +31,7 @@ #include "parquet/column/writer.h" #include "parquet/file/reader.h" #include "parquet/file/writer.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/statistics.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h index 68bd4c8..af5ada6 100644 --- a/src/parquet/column/statistics.h +++ b/src/parquet/column/statistics.h @@ -22,7 +22,7 @@ #include <memory> #include <string> -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index b52b932..094c65b 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -26,7 +26,7 @@ #include "parquet/column/statistics.h" #include "parquet/encodings/encoder.h" #include "parquet/file/metadata.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression-test.cc b/src/parquet/compression-test.cc new file mode 100644 index 0000000..f4fd3ba --- /dev/null +++ b/src/parquet/compression-test.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <gtest/gtest.h> +#include <string> +#include <vector> + +#include "parquet/compression.h" +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; + +namespace parquet { + +template <typename T> +void CheckCodecRoundtrip(const vector<uint8_t>& data) { + // create multiple compressors to try to break them + T c1; + T c2; + + int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]); + std::vector<uint8_t> compressed(max_compressed_len); + std::vector<uint8_t> decompressed(data.size()); + + // compress with c1 + int actual_size = + c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]); + compressed.resize(actual_size); + + // decompress with c2 + c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); + + ASSERT_TRUE(test::vector_equal(data, decompressed)); + + // compress with c2 + int actual_size2 = + c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]); + ASSERT_EQ(actual_size2, actual_size); + + // decompress with c1 + c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); + + ASSERT_TRUE(test::vector_equal(data, decompressed)); +} + +template <typename T> +void CheckCodec() { + int sizes[] = {10000, 100000}; + for (int data_size : sizes) { + vector<uint8_t> data; + test::random_bytes(data_size, 1234, &data); + CheckCodecRoundtrip<T>(data); + } +} + +TEST(TestCompressors, Snappy) { + CheckCodec<SnappyCodec>(); +} + +TEST(TestCompressors, Brotli) { + CheckCodec<BrotliCodec>(); +} + +TEST(TestCompressors, GZip) { + CheckCodec<GZipCodec>(); +} + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression.cc b/src/parquet/compression.cc new file mode 100644 index 0000000..97b5c17 --- /dev/null +++ b/src/parquet/compression.cc @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <string> + +#include <brotli/decode.h> +#include <brotli/encode.h> +#include <snappy.h> + +#include "parquet/compression.h" +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet { + +std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) { + std::unique_ptr<Codec> result; + switch (codec_type) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + result.reset(new SnappyCodec()); + break; + case Compression::GZIP: + result.reset(new GZipCodec()); + break; + case Compression::LZO: + ParquetException::NYI("LZO codec not implemented"); + break; + case Compression::BROTLI: + result.reset(new BrotliCodec()); + break; + default: + ParquetException::NYI("Unrecognized codec"); + break; + } + return result; +} + +// ---------------------------------------------------------------------- +// gzip implementation + +// These are magic numbers from zlib.h. Not clear why they are not defined +// there. + +// Maximum window size +static constexpr int WINDOW_BITS = 15; + +// Output Gzip. +static constexpr int GZIP_CODEC = 16; + +// Determine if this is libz or gzip from header. +static constexpr int DETECT_CODEC = 32; + +GZipCodec::GZipCodec(Format format) + : format_(format), compressor_initialized_(false), decompressor_initialized_(false) {} + +GZipCodec::~GZipCodec() { + EndCompressor(); + EndDecompressor(); +} + +void GZipCodec::InitCompressor() { + EndDecompressor(); + memset(&stream_, 0, sizeof(stream_)); + + int ret; + // Initialize to run specified format + int window_bits = WINDOW_BITS; + if (format_ == DEFLATE) { + window_bits = -window_bits; + } else if (format_ == GZIP) { + window_bits += GZIP_CODEC; + } + if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9, + Z_DEFAULT_STRATEGY)) != Z_OK) { + throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg)); + } + + compressor_initialized_ = true; +} + +void GZipCodec::EndCompressor() { + if (compressor_initialized_) { (void)deflateEnd(&stream_); } + compressor_initialized_ = false; +} + +void GZipCodec::InitDecompressor() { + EndCompressor(); + memset(&stream_, 0, sizeof(stream_)); + int ret; + + // Initialize to run either deflate or zlib/gzip format + int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; + if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { + throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); + } + decompressor_initialized_ = true; +} + +void GZipCodec::EndDecompressor() { + if (decompressor_initialized_) { (void)inflateEnd(&stream_); } + decompressor_initialized_ = false; +} + +void GZipCodec::Decompress( + int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { + if (!decompressor_initialized_) { InitDecompressor(); } + if (output_length == 0) { + // The zlib library does not allow *output to be NULL, even when output_length + // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an + // error, so bail early if no output is expected. Note that we don't signal + // an error if the input actually contains compressed data. + return; + } + + // Reset the stream for this block + if (inflateReset(&stream_) != Z_OK) { + throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg)); + } + + int ret = 0; + // gzip can run in streaming mode or non-streaming mode. We only + // support the non-streaming use case where we present it the entire + // compressed input and a buffer big enough to contain the entire + // compressed output. In the case where we don't know the output, + // we just make a bigger buffer and try the non-streaming mode + // from the beginning again. + while (ret != Z_STREAM_END) { + stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); + stream_.avail_in = input_length; + stream_.next_out = reinterpret_cast<Bytef*>(output); + stream_.avail_out = output_length; + + // We know the output size. In this case, we can use Z_FINISH + // which is more efficient. + ret = inflate(&stream_, Z_FINISH); + if (ret == Z_STREAM_END || ret != Z_OK) break; + + // Failure, buffer was too small + std::stringstream ss; + ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length + << " OutputLength=" << output_length; + throw ParquetException(ss.str()); + } + + // Failure for some other reason + if (ret != Z_STREAM_END) { + std::stringstream ss; + ss << "GZipCodec failed: "; + if (stream_.msg != NULL) ss << stream_.msg; + throw ParquetException(ss.str()); + } +} + +int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { + // Most be in compression mode + if (!compressor_initialized_) { InitCompressor(); } + // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase) + return deflateBound(&stream_, static_cast<uLong>(input_length)); +} + +int64_t GZipCodec::Compress( + int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { + if (!compressor_initialized_) { InitCompressor(); } + stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); + stream_.avail_in = input_length; + stream_.next_out = reinterpret_cast<Bytef*>(output); + stream_.avail_out = output_length; + + int64_t ret = 0; + if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { + if (ret == Z_OK) { + // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too + // small + throw ParquetException("zlib deflate failed, output buffer to small"); + } + std::stringstream ss; + ss << "zlib deflate failed: " << stream_.msg; + throw ParquetException(ss.str()); + } + + if (deflateReset(&stream_) != Z_OK) { + throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg)); + } + + // Actual output length + return output_length - stream_.avail_out; +} + +// ---------------------------------------------------------------------- +// Snappy implementation + +void SnappyCodec::Decompress( + int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { + if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), + static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) { + throw parquet::ParquetException("Corrupt snappy compressed data."); + } +} + +int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { + return snappy::MaxCompressedLength(input_len); +} + +int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) { + size_t output_len; + snappy::RawCompress(reinterpret_cast<const char*>(input), + static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer), + &output_len); + return output_len; +} + +// ---------------------------------------------------------------------- +// Brotli implementation + +void BrotliCodec::Decompress( + int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { + size_t output_size = output_len; + if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) != + BROTLI_DECODER_RESULT_SUCCESS) { + throw parquet::ParquetException("Corrupt brotli compressed data."); + } +} + +int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { + return BrotliEncoderMaxCompressedSize(input_len); +} + +int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) { + size_t output_len = output_buffer_len; + // TODO: Make quality configurable. We use 8 as a default as it is the best + // trade-off for Parquet workload + if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len, + input, &output_len, output_buffer) == BROTLI_FALSE) { + throw parquet::ParquetException("Brotli compression failure."); + } + return output_len; +} + +} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression.h b/src/parquet/compression.h new file mode 100644 index 0000000..abd4899 --- /dev/null +++ b/src/parquet/compression.h @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_COMPRESSION_CODEC_H +#define PARQUET_COMPRESSION_CODEC_H + +#include <zlib.h> + +#include <cstdint> +#include <memory> + +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet { + +class Codec { + public: + virtual ~Codec() {} + + static std::unique_ptr<Codec> Create(Compression::type codec); + + virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, + uint8_t* output_buffer) = 0; + + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) = 0; + + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0; + + virtual const char* name() const = 0; +}; + +// Snappy codec. +class SnappyCodec : public Codec { + public: + virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, + uint8_t* output_buffer); + + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer); + + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); + + virtual const char* name() const { return "snappy"; } +}; + +// Brotli codec. +class BrotliCodec : public Codec { + public: + void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, + uint8_t* output_buffer) override; + + int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, + uint8_t* output_buffer) override; + + int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; + + const char* name() const override { return "brotli"; } +}; + +// GZip codec. +class GZipCodec : public Codec { + public: + /// Compression formats supported by the zlib library + enum Format { + ZLIB, + DEFLATE, + GZIP, + }; + + explicit GZipCodec(Format format = GZIP); + virtual ~GZipCodec(); + + virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, + uint8_t* output_buffer); + + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer); + + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); + + virtual const char* name() const { return "gzip"; } + + private: + // zlib is stateful and the z_stream state variable must be initialized + // before + z_stream stream_; + + // Realistically, this will always be GZIP, but we leave the option open to + // configure + Format format_; + + // These variables are mutually exclusive. When the codec is in "compressor" + // state, compressor_initialized_ is true while decompressor_initialized_ is + // false. When it's decompressing, the opposite is true. + // + // Indeed, this is slightly hacky, but the alternative is having separate + // Compressor and Decompressor classes. If this ever becomes an issue, we can + // perform the refactoring then + void InitCompressor(); + void InitDecompressor(); + void EndCompressor(); + void EndDecompressor(); + bool compressor_initialized_; + bool decompressor_initialized_; +}; + +} // namespace parquet + +#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt deleted file mode 100644 index 82ba522..0000000 --- a/src/parquet/compression/CMakeLists.txt +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Headers: compression -install(FILES - codec.h - DESTINATION include/parquet/compression) - -ADD_PARQUET_TEST(codec-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/brotli-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/brotli-codec.cc b/src/parquet/compression/brotli-codec.cc deleted file mode 100644 index 8118206..0000000 --- a/src/parquet/compression/brotli-codec.cc +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <brotli/decode.h> -#include <brotli/encode.h> -#include <cstdint> -#include <cstdlib> - -#include "parquet/compression/codec.h" -#include "parquet/exception.h" - -namespace parquet { - -void BrotliCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - size_t output_size = output_len; - if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) != - BROTLI_DECODER_RESULT_SUCCESS) { - throw parquet::ParquetException("Corrupt brotli compressed data."); - } -} - -int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return BrotliEncoderMaxCompressedSize(input_len); -} - -int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len = output_buffer_len; - // TODO: Make quality configurable. We use 8 as a default as it is the best - // trade-off for Parquet workload - if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len, - input, &output_len, output_buffer) == BROTLI_FALSE) { - throw parquet::ParquetException("Brotli compression failure."); - } - return output_len; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc deleted file mode 100644 index f2be84b..0000000 --- a/src/parquet/compression/codec-test.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <gtest/gtest.h> -#include <string> -#include <vector> - -#include "parquet/compression/codec.h" -#include "parquet/util/test-common.h" - -using std::string; -using std::vector; - -namespace parquet { - -template <typename T> -void CheckCodecRoundtrip(const vector<uint8_t>& data) { - // create multiple compressors to try to break them - T c1; - T c2; - - int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]); - std::vector<uint8_t> compressed(max_compressed_len); - std::vector<uint8_t> decompressed(data.size()); - - // compress with c1 - int actual_size = - c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]); - compressed.resize(actual_size); - - // decompress with c2 - c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); - - // compress with c2 - int actual_size2 = - c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]); - ASSERT_EQ(actual_size2, actual_size); - - // decompress with c1 - c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); -} - -template <typename T> -void CheckCodec() { - int sizes[] = {10000, 100000}; - for (int data_size : sizes) { - vector<uint8_t> data; - test::random_bytes(data_size, 1234, &data); - CheckCodecRoundtrip<T>(data); - } -} - -TEST(TestCompressors, Snappy) { - CheckCodec<SnappyCodec>(); -} - -TEST(TestCompressors, Brotli) { - CheckCodec<BrotliCodec>(); -} - -TEST(TestCompressors, GZip) { - CheckCodec<GZipCodec>(); -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.cc b/src/parquet/compression/codec.cc deleted file mode 100644 index a7e5fba..0000000 --- a/src/parquet/compression/codec.cc +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <memory> - -#include "parquet/compression/codec.h" -#include "parquet/exception.h" -#include "parquet/types.h" - -namespace parquet { - -std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) { - std::unique_ptr<Codec> result; - switch (codec_type) { - case Compression::UNCOMPRESSED: - break; - case Compression::SNAPPY: - result.reset(new SnappyCodec()); - break; - case Compression::GZIP: - result.reset(new GZipCodec()); - break; - case Compression::LZO: - ParquetException::NYI("LZO codec not implemented"); - break; - case Compression::BROTLI: - result.reset(new BrotliCodec()); - break; - default: - ParquetException::NYI("Unrecognized codec"); - break; - } - return result; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h deleted file mode 100644 index abd4899..0000000 --- a/src/parquet/compression/codec.h +++ /dev/null @@ -1,125 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_COMPRESSION_CODEC_H -#define PARQUET_COMPRESSION_CODEC_H - -#include <zlib.h> - -#include <cstdint> -#include <memory> - -#include "parquet/exception.h" -#include "parquet/types.h" - -namespace parquet { - -class Codec { - public: - virtual ~Codec() {} - - static std::unique_ptr<Codec> Create(Compression::type codec); - - virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) = 0; - - virtual int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) = 0; - - virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0; - - virtual const char* name() const = 0; -}; - -// Snappy codec. -class SnappyCodec : public Codec { - public: - virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer); - - virtual int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer); - - virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); - - virtual const char* name() const { return "snappy"; } -}; - -// Brotli codec. -class BrotliCodec : public Codec { - public: - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, - uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override { return "brotli"; } -}; - -// GZip codec. -class GZipCodec : public Codec { - public: - /// Compression formats supported by the zlib library - enum Format { - ZLIB, - DEFLATE, - GZIP, - }; - - explicit GZipCodec(Format format = GZIP); - virtual ~GZipCodec(); - - virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer); - - virtual int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer); - - virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); - - virtual const char* name() const { return "gzip"; } - - private: - // zlib is stateful and the z_stream state variable must be initialized - // before - z_stream stream_; - - // Realistically, this will always be GZIP, but we leave the option open to - // configure - Format format_; - - // These variables are mutually exclusive. When the codec is in "compressor" - // state, compressor_initialized_ is true while decompressor_initialized_ is - // false. When it's decompressing, the opposite is true. - // - // Indeed, this is slightly hacky, but the alternative is having separate - // Compressor and Decompressor classes. If this ever becomes an issue, we can - // perform the refactoring then - void InitCompressor(); - void InitDecompressor(); - void EndCompressor(); - void EndDecompressor(); - bool compressor_initialized_; - bool decompressor_initialized_; -}; - -} // namespace parquet - -#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/gzip-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc deleted file mode 100644 index 6c27714..0000000 --- a/src/parquet/compression/gzip-codec.cc +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstring> -#include <sstream> -#include <string> - -#include "parquet/compression/codec.h" -#include "parquet/exception.h" - -namespace parquet { - -// These are magic numbers from zlib.h. Not clear why they are not defined -// there. - -// Maximum window size -static constexpr int WINDOW_BITS = 15; - -// Output Gzip. -static constexpr int GZIP_CODEC = 16; - -// Determine if this is libz or gzip from header. -static constexpr int DETECT_CODEC = 32; - -GZipCodec::GZipCodec(Format format) - : format_(format), compressor_initialized_(false), decompressor_initialized_(false) {} - -GZipCodec::~GZipCodec() { - EndCompressor(); - EndDecompressor(); -} - -void GZipCodec::InitCompressor() { - EndDecompressor(); - memset(&stream_, 0, sizeof(stream_)); - - int ret; - // Initialize to run specified format - int window_bits = WINDOW_BITS; - if (format_ == DEFLATE) { - window_bits = -window_bits; - } else if (format_ == GZIP) { - window_bits += GZIP_CODEC; - } - if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9, - Z_DEFAULT_STRATEGY)) != Z_OK) { - throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg)); - } - - compressor_initialized_ = true; -} - -void GZipCodec::EndCompressor() { - if (compressor_initialized_) { (void)deflateEnd(&stream_); } - compressor_initialized_ = false; -} - -void GZipCodec::InitDecompressor() { - EndCompressor(); - memset(&stream_, 0, sizeof(stream_)); - int ret; - - // Initialize to run either deflate or zlib/gzip format - int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; - if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { - throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); - } - decompressor_initialized_ = true; -} - -void GZipCodec::EndDecompressor() { - if (decompressor_initialized_) { (void)inflateEnd(&stream_); } - decompressor_initialized_ = false; -} - -void GZipCodec::Decompress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - if (!decompressor_initialized_) { InitDecompressor(); } - if (output_length == 0) { - // The zlib library does not allow *output to be NULL, even when output_length - // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an - // error, so bail early if no output is expected. Note that we don't signal - // an error if the input actually contains compressed data. - return; - } - - // Reset the stream for this block - if (inflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg)); - } - - int ret = 0; - // gzip can run in streaming mode or non-streaming mode. We only - // support the non-streaming use case where we present it the entire - // compressed input and a buffer big enough to contain the entire - // compressed output. In the case where we don't know the output, - // we just make a bigger buffer and try the non-streaming mode - // from the beginning again. - while (ret != Z_STREAM_END) { - stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); - stream_.avail_in = input_length; - stream_.next_out = reinterpret_cast<Bytef*>(output); - stream_.avail_out = output_length; - - // We know the output size. In this case, we can use Z_FINISH - // which is more efficient. - ret = inflate(&stream_, Z_FINISH); - if (ret == Z_STREAM_END || ret != Z_OK) break; - - // Failure, buffer was too small - std::stringstream ss; - ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length - << " OutputLength=" << output_length; - throw ParquetException(ss.str()); - } - - // Failure for some other reason - if (ret != Z_STREAM_END) { - std::stringstream ss; - ss << "GZipCodec failed: "; - if (stream_.msg != NULL) ss << stream_.msg; - throw ParquetException(ss.str()); - } -} - -int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { - // Most be in compression mode - if (!compressor_initialized_) { InitCompressor(); } - // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase) - return deflateBound(&stream_, static_cast<uLong>(input_length)); -} - -int64_t GZipCodec::Compress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - if (!compressor_initialized_) { InitCompressor(); } - stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); - stream_.avail_in = input_length; - stream_.next_out = reinterpret_cast<Bytef*>(output); - stream_.avail_out = output_length; - - int64_t ret = 0; - if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { - if (ret == Z_OK) { - // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too - // small - throw ParquetException("zlib deflate failed, output buffer to small"); - } - std::stringstream ss; - ss << "zlib deflate failed: " << stream_.msg; - throw ParquetException(ss.str()); - } - - if (deflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg)); - } - - // Actual output length - return output_length - stream_.avail_out; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/snappy-codec.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc deleted file mode 100644 index 70f3a41..0000000 --- a/src/parquet/compression/snappy-codec.cc +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <cstdlib> -#include <snappy.h> - -#include "parquet/compression/codec.h" -#include "parquet/exception.h" - -namespace parquet { - -void SnappyCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), - static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) { - throw parquet::ParquetException("Corrupt snappy compressed data."); - } -} - -int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return snappy::MaxCompressedLength(input_len); -} - -int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len; - snappy::RawCompress(reinterpret_cast<const char*>(input), - static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer), - &output_len); - return output_len; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/encodings/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc index a1aa448..1e9894d 100644 --- a/src/parquet/encodings/encoding-test.cc +++ b/src/parquet/encodings/encoding-test.cc @@ -24,8 +24,7 @@ #include "parquet/encodings/dictionary-encoding.h" #include "parquet/encodings/plain-encoding.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/bit-util.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index d2127ef..5e7e269 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -23,7 +23,7 @@ #include "parquet/encodings/decoder.h" #include "parquet/encodings/encoder.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/util/bit-stream-utils.inline.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/file-deserialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc index fbb511a..30b3f6d 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -27,7 +27,7 @@ #include <vector> #include "parquet/column/page.h" -#include "parquet/compression/codec.h" +#include "parquet/compression.h" #include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/thrift/parquet_types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/file-metadata-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 9b9fffd..0edc34f 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -17,8 +17,7 @@ #include "parquet/column/statistics.h" #include "parquet/file/metadata.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/types.h" #include <gtest/gtest.h> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 2f7f9d2..b4dffde 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -21,7 +21,8 @@ #include "parquet/exception.h" #include "parquet/file/metadata.h" -#include "parquet/schema/converter.h" +#include "parquet/schema-internal.h" +#include "parquet/schema.h" #include "parquet/thrift/util.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 7307ddf..1f8b09f 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -24,8 +24,8 @@ #include "parquet/column/properties.h" #include "parquet/column/statistics.h" -#include "parquet/compression/codec.h" -#include "parquet/schema/descriptor.h" +#include "parquet/compression.h" +#include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index d6fcc48..425a001 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -25,11 +25,9 @@ #include <vector> #include "parquet/column/page.h" -#include "parquet/compression/codec.h" +#include "parquet/compression.h" #include "parquet/exception.h" -#include "parquet/schema/converter.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/thrift/util.h" #include "parquet/types.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 7b21c2e..4ff065d 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -24,7 +24,7 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" -#include "parquet/compression/codec.h" +#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/reader.h" #include "parquet/thrift/parquet_types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index 723595a..3cdfa9f 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -29,7 +29,7 @@ #include "parquet/column/properties.h" #include "parquet/column/statistics.h" #include "parquet/file/metadata.h" -#include "parquet/schema/descriptor.h" +#include "parquet/schema.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 724635c..56f08f3 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -18,7 +18,8 @@ #include "parquet/file/writer-internal.h" #include "parquet/column/writer.h" -#include "parquet/schema/converter.h" +#include "parquet/schema-internal.h" +#include "parquet/schema.h" #include "parquet/thrift/util.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 12002e1..e711c44 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -22,7 +22,7 @@ #include <vector> #include "parquet/column/page.h" -#include "parquet/compression/codec.h" +#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/writer.h" #include "parquet/thrift/parquet_types.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 6d7161b..07ccb51 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -22,8 +22,7 @@ #include <memory> #include "parquet/column/properties.h" -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" +#include "parquet/schema.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/schema-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/schema-internal.h b/src/parquet/schema-internal.h new file mode 100644 index 0000000..5ceaa0c --- /dev/null +++ b/src/parquet/schema-internal.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#ifndef PARQUET_SCHEMA_INTERNAL_H +#define PARQUET_SCHEMA_INTERNAL_H + +#include <cstdint> +#include <memory> +#include <vector> + +#include "parquet/schema.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" +#include "parquet/util/macros.h" +#include "parquet/util/visibility.h" + +namespace parquet { +namespace schema { + +// ---------------------------------------------------------------------- +// Conversion from Parquet Thrift metadata + +std::shared_ptr<SchemaDescriptor> FromParquet( + const std::vector<format::SchemaElement>& schema); + +class FlatSchemaConverter { + public: + FlatSchemaConverter(const format::SchemaElement* elements, int length) + : elements_(elements), length_(length), pos_(0), current_id_(0) {} + + std::unique_ptr<Node> Convert(); + + private: + const format::SchemaElement* elements_; + int length_; + int pos_; + int current_id_; + + int next_id() { return current_id_++; } + + const format::SchemaElement& Next(); + + std::unique_ptr<Node> NextNode(); +}; + +// ---------------------------------------------------------------------- +// Conversion to Parquet Thrift metadata + +void ToParquet(const GroupNode* schema, std::vector<format::SchemaElement>* out); + +// Converts nested parquet schema back to a flat vector of Thrift structs +class SchemaFlattener { + public: + SchemaFlattener(const GroupNode* schema, std::vector<format::SchemaElement>* out); + + void Flatten(); + + private: + const GroupNode* root_; + std::vector<format::SchemaElement>* elements_; +}; + +} // namespace schema +} // namespace parquet + +#endif // PARQUET_SCHEMA_INTERNAL_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc new file mode 100644 index 0000000..b092b38 --- /dev/null +++ b/src/parquet/schema-test.cc @@ -0,0 +1,703 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <cstdlib> +#include <iosfwd> +#include <memory> +#include <string> +#include <vector> + +#include "parquet/exception.h" +#include "parquet/schema-internal.h" +#include "parquet/schema.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" + +using std::string; +using std::vector; + +namespace parquet { + +using format::ConvertedType; +using format::FieldRepetitionType; +using format::SchemaElement; + +namespace schema { + +static inline SchemaElement NewPrimitive(const std::string& name, + FieldRepetitionType::type repetition, format::Type::type type, int id = 0) { + SchemaElement result; + result.__set_name(name); + result.__set_repetition_type(repetition); + result.__set_type(type); + result.__set_num_children(0); + + return result; +} + +static inline SchemaElement NewGroup(const std::string& name, + FieldRepetitionType::type repetition, int num_children, int id = 0) { + SchemaElement result; + result.__set_name(name); + result.__set_repetition_type(repetition); + result.__set_num_children(num_children); + + return result; +} + +// ---------------------------------------------------------------------- +// ColumnPath + +TEST(TestColumnPath, TestAttrs) { + ColumnPath path(std::vector<std::string>({"toplevel", "leaf"})); + + ASSERT_EQ(path.ToDotString(), "toplevel.leaf"); + + std::shared_ptr<ColumnPath> path_ptr = ColumnPath::FromDotString("toplevel.leaf"); + ASSERT_EQ(path_ptr->ToDotString(), "toplevel.leaf"); + + std::shared_ptr<ColumnPath> extended = path_ptr->extend("anotherlevel"); + ASSERT_EQ(extended->ToDotString(), "toplevel.leaf.anotherlevel"); +} + +// ---------------------------------------------------------------------- +// Primitive node + +class TestPrimitiveNode : public ::testing::Test { + public: + void SetUp() { + name_ = "name"; + id_ = 5; + } + + void Convert(const format::SchemaElement* element) { + node_ = PrimitiveNode::FromParquet(element, id_); + ASSERT_TRUE(node_->is_primitive()); + prim_node_ = static_cast<const PrimitiveNode*>(node_.get()); + } + + protected: + std::string name_; + const PrimitiveNode* prim_node_; + + int id_; + std::unique_ptr<Node> node_; +}; + +TEST_F(TestPrimitiveNode, Attrs) { + PrimitiveNode node1("foo", Repetition::REPEATED, Type::INT32); + + PrimitiveNode node2("bar", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8); + + ASSERT_EQ("foo", node1.name()); + + ASSERT_TRUE(node1.is_primitive()); + ASSERT_FALSE(node1.is_group()); + + ASSERT_EQ(Repetition::REPEATED, node1.repetition()); + ASSERT_EQ(Repetition::OPTIONAL, node2.repetition()); + + ASSERT_EQ(Node::PRIMITIVE, node1.node_type()); + + ASSERT_EQ(Type::INT32, node1.physical_type()); + ASSERT_EQ(Type::BYTE_ARRAY, node2.physical_type()); + + // logical types + ASSERT_EQ(LogicalType::NONE, node1.logical_type()); + ASSERT_EQ(LogicalType::UTF8, node2.logical_type()); + + // repetition + node1 = PrimitiveNode("foo", Repetition::REQUIRED, Type::INT32); + node2 = PrimitiveNode("foo", Repetition::OPTIONAL, Type::INT32); + PrimitiveNode node3("foo", Repetition::REPEATED, Type::INT32); + + ASSERT_TRUE(node1.is_required()); + + ASSERT_TRUE(node2.is_optional()); + ASSERT_FALSE(node2.is_required()); + + ASSERT_TRUE(node3.is_repeated()); + ASSERT_FALSE(node3.is_optional()); +} + +TEST_F(TestPrimitiveNode, FromParquet) { + SchemaElement elt = + NewPrimitive(name_, FieldRepetitionType::OPTIONAL, format::Type::INT32, 0); + Convert(&elt); + ASSERT_EQ(name_, prim_node_->name()); + ASSERT_EQ(id_, prim_node_->id()); + ASSERT_EQ(Repetition::OPTIONAL, prim_node_->repetition()); + ASSERT_EQ(Type::INT32, prim_node_->physical_type()); + ASSERT_EQ(LogicalType::NONE, prim_node_->logical_type()); + + // Test a logical type + elt = NewPrimitive(name_, FieldRepetitionType::REQUIRED, format::Type::BYTE_ARRAY, 0); + elt.__set_converted_type(ConvertedType::UTF8); + + Convert(&elt); + ASSERT_EQ(Repetition::REQUIRED, prim_node_->repetition()); + ASSERT_EQ(Type::BYTE_ARRAY, prim_node_->physical_type()); + ASSERT_EQ(LogicalType::UTF8, prim_node_->logical_type()); + + // FIXED_LEN_BYTE_ARRAY + elt = NewPrimitive( + name_, FieldRepetitionType::OPTIONAL, format::Type::FIXED_LEN_BYTE_ARRAY, 0); + elt.__set_type_length(16); + + Convert(&elt); + ASSERT_EQ(name_, prim_node_->name()); + ASSERT_EQ(id_, prim_node_->id()); + ASSERT_EQ(Repetition::OPTIONAL, prim_node_->repetition()); + ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, prim_node_->physical_type()); + ASSERT_EQ(16, prim_node_->type_length()); + + // ConvertedType::Decimal + elt = NewPrimitive( + name_, FieldRepetitionType::OPTIONAL, format::Type::FIXED_LEN_BYTE_ARRAY, 0); + elt.__set_converted_type(ConvertedType::DECIMAL); + elt.__set_type_length(6); + elt.__set_scale(2); + elt.__set_precision(12); + + Convert(&elt); + ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, prim_node_->physical_type()); + ASSERT_EQ(LogicalType::DECIMAL, prim_node_->logical_type()); + ASSERT_EQ(6, prim_node_->type_length()); + ASSERT_EQ(2, prim_node_->decimal_metadata().scale); + ASSERT_EQ(12, prim_node_->decimal_metadata().precision); +} + +TEST_F(TestPrimitiveNode, Equals) { + PrimitiveNode node1("foo", Repetition::REQUIRED, Type::INT32); + PrimitiveNode node2("foo", Repetition::REQUIRED, Type::INT64); + PrimitiveNode node3("bar", Repetition::REQUIRED, Type::INT32); + PrimitiveNode node4("foo", Repetition::OPTIONAL, Type::INT32); + PrimitiveNode node5("foo", Repetition::REQUIRED, Type::INT32); + + ASSERT_TRUE(node1.Equals(&node1)); + ASSERT_FALSE(node1.Equals(&node2)); + ASSERT_FALSE(node1.Equals(&node3)); + ASSERT_FALSE(node1.Equals(&node4)); + ASSERT_TRUE(node1.Equals(&node5)); + + PrimitiveNode flba1("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::DECIMAL, 12, 4, 2); + + PrimitiveNode flba2("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::DECIMAL, 1, 4, 2); + flba2.SetTypeLength(12); + + PrimitiveNode flba3("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::DECIMAL, 1, 4, 2); + flba3.SetTypeLength(16); + + PrimitiveNode flba4("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::DECIMAL, 12, 4, 0); + + PrimitiveNode flba5("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::NONE, 12, 4, 0); + + ASSERT_TRUE(flba1.Equals(&flba2)); + ASSERT_FALSE(flba1.Equals(&flba3)); + ASSERT_FALSE(flba1.Equals(&flba4)); + ASSERT_FALSE(flba1.Equals(&flba5)); +} + +TEST_F(TestPrimitiveNode, PhysicalLogicalMapping) { + ASSERT_NO_THROW( + PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::INT_32)); + ASSERT_NO_THROW(PrimitiveNode::Make( + "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::JSON)); + ASSERT_THROW( + PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::JSON), + ParquetException); + ASSERT_NO_THROW(PrimitiveNode::Make( + "foo", Repetition::REQUIRED, Type::INT64, LogicalType::TIMESTAMP_MILLIS)); + ASSERT_THROW( + PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::INT_64), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make( + "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::INT_8), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make( + "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::INTERVAL), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::ENUM), + ParquetException); + ASSERT_NO_THROW(PrimitiveNode::Make( + "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::ENUM)); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 0, 2, 4), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::FLOAT, + LogicalType::DECIMAL, 0, 2, 4), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 0, 4, 0), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 0, 4), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 4, -1), + ParquetException); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 2, 4), + ParquetException); + ASSERT_NO_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 6, 4)); + ASSERT_NO_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12)); + ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 10), + ParquetException); +} + +// ---------------------------------------------------------------------- +// Group node + +class TestGroupNode : public ::testing::Test { + public: + NodeVector Fields1() { + NodeVector fields; + + fields.push_back(Int32("one", Repetition::REQUIRED)); + fields.push_back(Int64("two")); + fields.push_back(Double("three")); + + return fields; + } +}; + +TEST_F(TestGroupNode, Attrs) { + NodeVector fields = Fields1(); + + GroupNode node1("foo", Repetition::REPEATED, fields); + GroupNode node2("bar", Repetition::OPTIONAL, fields, LogicalType::LIST); + + ASSERT_EQ("foo", node1.name()); + + ASSERT_TRUE(node1.is_group()); + ASSERT_FALSE(node1.is_primitive()); + + ASSERT_EQ(fields.size(), node1.field_count()); + + ASSERT_TRUE(node1.is_repeated()); + ASSERT_TRUE(node2.is_optional()); + + ASSERT_EQ(Repetition::REPEATED, node1.repetition()); + ASSERT_EQ(Repetition::OPTIONAL, node2.repetition()); + + ASSERT_EQ(Node::GROUP, node1.node_type()); + + // logical types + ASSERT_EQ(LogicalType::NONE, node1.logical_type()); + ASSERT_EQ(LogicalType::LIST, node2.logical_type()); +} + +TEST_F(TestGroupNode, Equals) { + NodeVector f1 = Fields1(); + NodeVector f2 = Fields1(); + + GroupNode group1("group", Repetition::REPEATED, f1); + GroupNode group2("group", Repetition::REPEATED, f2); + GroupNode group3("group2", Repetition::REPEATED, f2); + + // This is copied in the GroupNode ctor, so this is okay + f2.push_back(Float("four", Repetition::OPTIONAL)); + GroupNode group4("group", Repetition::REPEATED, f2); + GroupNode group5("group", Repetition::REPEATED, Fields1()); + + ASSERT_TRUE(group1.Equals(&group1)); + ASSERT_TRUE(group1.Equals(&group2)); + ASSERT_FALSE(group1.Equals(&group3)); + + ASSERT_FALSE(group1.Equals(&group4)); + ASSERT_FALSE(group5.Equals(&group4)); +} + +// ---------------------------------------------------------------------- +// Test convert group + +class TestSchemaConverter : public ::testing::Test { + public: + void setUp() { name_ = "parquet_schema"; } + + void Convert(const parquet::format::SchemaElement* elements, int length) { + FlatSchemaConverter converter(elements, length); + node_ = converter.Convert(); + ASSERT_TRUE(node_->is_group()); + group_ = static_cast<const GroupNode*>(node_.get()); + } + + protected: + std::string name_; + const GroupNode* group_; + std::unique_ptr<Node> node_; +}; + +bool check_for_parent_consistency(const GroupNode* node) { + // Each node should have the group as parent + for (int i = 0; i < node->field_count(); i++) { + const NodePtr& field = node->field(i); + if (field->parent() != node) { return false; } + if (field->is_group()) { + const GroupNode* group = static_cast<GroupNode*>(field.get()); + if (!check_for_parent_consistency(group)) { return false; } + } + } + return true; +} + +TEST_F(TestSchemaConverter, NestedExample) { + SchemaElement elt; + std::vector<SchemaElement> elements; + elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0)); + + // A primitive one + elements.push_back( + NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1)); + + // A group + elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2)); + + // 3-level list encoding, by hand + elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3); + elt.__set_converted_type(ConvertedType::LIST); + elements.push_back(elt); + elements.push_back( + NewPrimitive("item", FieldRepetitionType::OPTIONAL, format::Type::INT64, 4)); + + Convert(&elements[0], elements.size()); + + // Construct the expected schema + NodeVector fields; + fields.push_back(Int32("a", Repetition::REQUIRED)); + + // 3-level list encoding + NodePtr item = Int64("item"); + NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST)); + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); + fields.push_back(bag); + + NodePtr schema = GroupNode::Make(name_, Repetition::REPEATED, fields); + + ASSERT_TRUE(schema->Equals(group_)); + + // Check that the parent relationship in each node is consitent + ASSERT_EQ(group_->parent(), nullptr); + ASSERT_TRUE(check_for_parent_consistency(group_)); +} + +TEST_F(TestSchemaConverter, InvalidRoot) { + // According to the Parquet specification, the first element in the + // list<SchemaElement> is a group whose children (and their descendants) + // contain all of the rest of the flattened schema elements. If the first + // element is not a group, it is a malformed Parquet file. + + SchemaElement elements[2]; + elements[0] = + NewPrimitive("not-a-group", FieldRepetitionType::REQUIRED, format::Type::INT32, 0); + ASSERT_THROW(Convert(elements, 2), ParquetException); + + // While the Parquet spec indicates that the root group should have REPEATED + // repetition type, some implementations may return REQUIRED or OPTIONAL + // groups as the first element. These tests check that this is okay as a + // practicality matter. + elements[0] = NewGroup("not-repeated", FieldRepetitionType::REQUIRED, 1, 0); + elements[1] = NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1); + Convert(elements, 2); + + elements[0] = NewGroup("not-repeated", FieldRepetitionType::OPTIONAL, 1, 0); + Convert(elements, 2); +} + +TEST_F(TestSchemaConverter, NotEnoughChildren) { + // Throw a ParquetException, but don't core dump or anything + SchemaElement elt; + std::vector<SchemaElement> elements; + elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0)); + ASSERT_THROW(Convert(&elements[0], 1), ParquetException); +} + +// ---------------------------------------------------------------------- +// Schema tree flatten / unflatten + +class TestSchemaFlatten : public ::testing::Test { + public: + void setUp() { name_ = "parquet_schema"; } + + void Flatten(const GroupNode* schema) { ToParquet(schema, &elements_); } + + protected: + std::string name_; + std::vector<format::SchemaElement> elements_; +}; + +TEST_F(TestSchemaFlatten, DecimalMetadata) { + // Checks that DecimalMetadata is only set for DecimalTypes + NodePtr node = PrimitiveNode::Make( + "decimal", Repetition::REQUIRED, Type::INT64, LogicalType::DECIMAL, -1, 8, 4); + NodePtr group = + GroupNode::Make("group", Repetition::REPEATED, {node}, LogicalType::LIST); + Flatten(reinterpret_cast<GroupNode*>(group.get())); + ASSERT_EQ("decimal", elements_[1].name); + ASSERT_TRUE(elements_[1].__isset.precision); + ASSERT_TRUE(elements_[1].__isset.scale); + + elements_.clear(); + // Not for integers with no logical type + group = + GroupNode::Make("group", Repetition::REPEATED, {Int64("int64")}, LogicalType::LIST); + Flatten(reinterpret_cast<GroupNode*>(group.get())); + ASSERT_EQ("int64", elements_[1].name); + ASSERT_FALSE(elements_[0].__isset.precision); + ASSERT_FALSE(elements_[0].__isset.scale); +} + +TEST_F(TestSchemaFlatten, NestedExample) { + SchemaElement elt; + std::vector<SchemaElement> elements; + elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0)); + + // A primitive one + elements.push_back( + NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1)); + + // A group + elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2)); + + // 3-level list encoding, by hand + elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3); + elt.__set_converted_type(ConvertedType::LIST); + elements.push_back(elt); + elements.push_back( + NewPrimitive("item", FieldRepetitionType::OPTIONAL, format::Type::INT64, 4)); + + // Construct the schema + NodeVector fields; + fields.push_back(Int32("a", Repetition::REQUIRED)); + + // 3-level list encoding + NodePtr item = Int64("item"); + NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST)); + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); + fields.push_back(bag); + + NodePtr schema = GroupNode::Make(name_, Repetition::REPEATED, fields); + + Flatten(static_cast<GroupNode*>(schema.get())); + ASSERT_EQ(elements_.size(), elements.size()); + for (size_t i = 0; i < elements_.size(); i++) { + ASSERT_EQ(elements_[i], elements[i]); + } +} + +TEST(TestColumnDescriptor, TestAttrs) { + NodePtr node = PrimitiveNode::Make( + "name", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8); + ColumnDescriptor descr(node, 4, 1); + + ASSERT_EQ("name", descr.name()); + ASSERT_EQ(4, descr.max_definition_level()); + ASSERT_EQ(1, descr.max_repetition_level()); + + ASSERT_EQ(Type::BYTE_ARRAY, descr.physical_type()); + + ASSERT_EQ(-1, descr.type_length()); + + // Test FIXED_LEN_BYTE_ARRAY + node = PrimitiveNode::Make("name", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::DECIMAL, 12, 10, 4); + descr = ColumnDescriptor(node, 4, 1); + + ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, descr.physical_type()); + ASSERT_EQ(12, descr.type_length()); +} + +class TestSchemaDescriptor : public ::testing::Test { + public: + void setUp() {} + + protected: + SchemaDescriptor descr_; +}; + +TEST_F(TestSchemaDescriptor, InitNonGroup) { + NodePtr node = PrimitiveNode::Make("field", Repetition::OPTIONAL, Type::INT32); + + ASSERT_THROW(descr_.Init(node), ParquetException); +} + +TEST_F(TestSchemaDescriptor, Equals) { + NodePtr schema; + + NodePtr inta = Int32("a", Repetition::REQUIRED); + NodePtr intb = Int64("b", Repetition::OPTIONAL); + NodePtr intb2 = Int64("b2", Repetition::OPTIONAL); + NodePtr intc = ByteArray("c", Repetition::REPEATED); + + NodePtr item1 = Int64("item1", Repetition::REQUIRED); + NodePtr item2 = Boolean("item2", Repetition::OPTIONAL); + NodePtr item3 = Int32("item3", Repetition::REPEATED); + NodePtr list(GroupNode::Make( + "records", Repetition::REPEATED, {item1, item2, item3}, LogicalType::LIST)); + + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); + NodePtr bag2(GroupNode::Make("bag", Repetition::REQUIRED, {list})); + + SchemaDescriptor descr1; + descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag})); + + ASSERT_TRUE(descr1.Equals(descr1)); + + SchemaDescriptor descr2; + descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag2})); + ASSERT_FALSE(descr1.Equals(descr2)); + + SchemaDescriptor descr3; + descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb2, intc, bag})); + ASSERT_FALSE(descr1.Equals(descr3)); + + // Robust to name of parent node + SchemaDescriptor descr4; + descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, {inta, intb, intc, bag})); + ASSERT_TRUE(descr1.Equals(descr4)); + + SchemaDescriptor descr5; + descr5.Init( + GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag, intb2})); + ASSERT_FALSE(descr1.Equals(descr5)); + + // Different max repetition / definition levels + ColumnDescriptor col1(inta, 5, 1); + ColumnDescriptor col2(inta, 6, 1); + ColumnDescriptor col3(inta, 5, 2); + + ASSERT_TRUE(col1.Equals(col1)); + ASSERT_FALSE(col1.Equals(col2)); + ASSERT_FALSE(col1.Equals(col3)); +} + +TEST_F(TestSchemaDescriptor, BuildTree) { + NodeVector fields; + NodePtr schema; + + NodePtr inta = Int32("a", Repetition::REQUIRED); + fields.push_back(inta); + fields.push_back(Int64("b", Repetition::OPTIONAL)); + fields.push_back(ByteArray("c", Repetition::REPEATED)); + + // 3-level list encoding + NodePtr item1 = Int64("item1", Repetition::REQUIRED); + NodePtr item2 = Boolean("item2", Repetition::OPTIONAL); + NodePtr item3 = Int32("item3", Repetition::REPEATED); + NodePtr list(GroupNode::Make( + "records", Repetition::REPEATED, {item1, item2, item3}, LogicalType::LIST)); + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); + fields.push_back(bag); + + schema = GroupNode::Make("schema", Repetition::REPEATED, fields); + + descr_.Init(schema); + + int nleaves = 6; + + // 6 leaves + ASSERT_EQ(nleaves, descr_.num_columns()); + + // mdef mrep + // required int32 a 0 0 + // optional int64 b 1 0 + // repeated byte_array c 1 1 + // optional group bag 1 0 + // repeated group records 2 1 + // required int64 item1 2 1 + // optional boolean item2 3 1 + // repeated int32 item3 3 2 + int16_t ex_max_def_levels[6] = {0, 1, 1, 2, 3, 3}; + int16_t ex_max_rep_levels[6] = {0, 0, 1, 1, 1, 2}; + + for (int i = 0; i < nleaves; ++i) { + const ColumnDescriptor* col = descr_.Column(i); + EXPECT_EQ(ex_max_def_levels[i], col->max_definition_level()) << i; + EXPECT_EQ(ex_max_rep_levels[i], col->max_repetition_level()) << i; + } + + ASSERT_EQ(descr_.Column(0)->path()->ToDotString(), "a"); + ASSERT_EQ(descr_.Column(1)->path()->ToDotString(), "b"); + ASSERT_EQ(descr_.Column(2)->path()->ToDotString(), "c"); + ASSERT_EQ(descr_.Column(3)->path()->ToDotString(), "bag.records.item1"); + ASSERT_EQ(descr_.Column(4)->path()->ToDotString(), "bag.records.item2"); + ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3"); + + ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get()); + ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get()); + ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get()); + ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get()); + + ASSERT_EQ(schema.get(), descr_.group_node()); + + // Init clears the leaves + descr_.Init(schema); + ASSERT_EQ(nleaves, descr_.num_columns()); +} + +static std::string Print(const NodePtr& node) { + std::stringstream ss; + PrintSchema(node.get(), ss); + return ss.str(); +} + +TEST(TestSchemaPrinter, Examples) { + // Test schema 1 + NodeVector fields; + fields.push_back(Int32("a", Repetition::REQUIRED)); + + // 3-level list encoding + NodePtr item1 = Int64("item1"); + NodePtr item2 = Boolean("item2", Repetition::REQUIRED); + NodePtr list( + GroupNode::Make("b", Repetition::REPEATED, {item1, item2}, LogicalType::LIST)); + NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); + fields.push_back(bag); + + fields.push_back(PrimitiveNode::Make( + "c", Repetition::REQUIRED, Type::INT32, LogicalType::DECIMAL, -1, 3, 2)); + + NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, fields); + + std::string result = Print(schema); + std::string expected = R"(message schema { + required int32 a; + optional group bag { + repeated group b (LIST) { + optional int64 item1; + required boolean item2; + } + } + required int32 c (DECIMAL(3,2)); +} +)"; + ASSERT_EQ(expected, result); +} + +} // namespace schema +} // namespace parquet
