Repository: parquet-cpp Updated Branches: refs/heads/master aacc8b528 -> 257e65b81
PARQUET-843: Impala is thrown off by a REPEATED root schema node We do not use the repetition level of the root node in the schema, and neither does parquet-mr. However, Impala 2.8.0 increases the max_repetition_level and expects to find repetition levels that are not being written out. With this change, Impala is able to read our files again. Author: Wes McKinney <[email protected]> Closes #227 from wesm/PARQUET-843 and squashes the following commits: dec382a [Wes McKinney] Impala is thrown off by a REPEATED root schema node. Some uint32_t->int32_t changes Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/257e65b8 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/257e65b8 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/257e65b8 Branch: refs/heads/master Commit: 257e65b8136463b716eeeee53836aec9916d0c15 Parents: aacc8b5 Author: Wes McKinney <[email protected]> Authored: Wed Jan 25 21:42:37 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Wed Jan 25 21:42:37 2017 -0500 ---------------------------------------------------------------------- src/parquet/arrow/schema.cc | 2 +- src/parquet/column/column-writer-test.cc | 9 +++++---- src/parquet/column/level-benchmark.cc | 6 +++--- src/parquet/column/levels-test.cc | 4 ++-- src/parquet/column/levels.cc | 8 ++++---- src/parquet/column/test-util.h | 4 ++-- src/parquet/column/writer.cc | 9 ++++----- src/parquet/file/metadata.cc | 11 ++++++----- 8 files changed, 27 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 4f17f5e..65e3381 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -406,7 +406,7 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema, RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); } - NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); + NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes); *out = std::make_shared<::parquet::SchemaDescriptor>(); PARQUET_CATCH_NOT_OK((*out)->Init(schema)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index fc944ca..dedb2c2 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -431,12 +431,13 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case // Dictionary encoding is not allowed for boolean type // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case - ASSERT_EQ(Encoding::RLE, encodings[0]); if (this->type_num() != Type::BOOLEAN) { - ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]); - ASSERT_EQ(Encoding::PLAIN, encodings[2]); - } else { + ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]); ASSERT_EQ(Encoding::PLAIN, encodings[1]); + ASSERT_EQ(Encoding::RLE, encodings[2]); + } else { + ASSERT_EQ(Encoding::PLAIN, encodings[0]); + ASSERT_EQ(Encoding::RLE, encodings[1]); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/level-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/level-benchmark.cc b/src/parquet/column/level-benchmark.cc index 8ae2fe1..036108f 100644 --- a/src/parquet/column/level-benchmark.cc +++ b/src/parquet/column/level-benchmark.cc @@ -55,11 +55,11 @@ static void BM_RleDecoding(::benchmark::State& state) { int16_t max_level = 1; int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size()); auto buffer_rle = std::make_shared<PoolBuffer>(); - PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(uint32_t))); + PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t))); level_encoder.Init(Encoding::RLE, max_level, levels.size(), - buffer_rle->mutable_data() + sizeof(uint32_t), rle_size); + buffer_rle->mutable_data() + sizeof(int32_t), rle_size); level_encoder.Encode(levels.size(), levels.data()); - reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len(); + reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len(); while (state.KeepRunning()) { LevelDecoder level_decoder; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/levels-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc index cee2763..1d29313 100644 --- a/src/parquet/column/levels-test.cc +++ b/src/parquet/column/levels-test.cc @@ -57,10 +57,10 @@ void EncodeLevels(Encoding::type encoding, int max_level, int num_levels, if (encoding == Encoding::RLE) { // leave space to write the rle length value encoder.Init( - encoding, max_level, num_levels, bytes.data() + sizeof(uint32_t), bytes.size()); + encoding, max_level, num_levels, bytes.data() + sizeof(int32_t), bytes.size()); levels_count = encoder.Encode(num_levels, input_levels); - (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len(); + (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len(); } else { encoder.Init(encoding, max_level, num_levels, bytes.data(), bytes.size()); levels_count = encoder.Encode(num_levels, input_levels); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/levels.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc index 3e7b9df..716e08a 100644 --- a/src/parquet/column/levels.cc +++ b/src/parquet/column/levels.cc @@ -96,20 +96,20 @@ LevelDecoder::~LevelDecoder() {} int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data) { - uint32_t num_bytes = 0; + int32_t num_bytes = 0; encoding_ = encoding; num_values_remaining_ = num_buffered_values; bit_width_ = BitUtil::Log2(max_level + 1); switch (encoding) { case Encoding::RLE: { - num_bytes = *reinterpret_cast<const uint32_t*>(data); - const uint8_t* decoder_data = data + sizeof(uint32_t); + num_bytes = *reinterpret_cast<const int32_t*>(data); + const uint8_t* decoder_data = data + sizeof(int32_t); if (!rle_decoder_) { rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); } else { rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); } - return sizeof(uint32_t) + num_bytes; + return sizeof(int32_t) + num_bytes; } case Encoding::BIT_PACKED: { num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 9efa623..f0580f5 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -182,8 +182,8 @@ class DataPageBuilder { encoder.Encode(levels.size(), levels.data()); - uint32_t rle_bytes = encoder.len(); - sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t)); + int32_t rle_bytes = encoder.len(); + sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t)); sink_->Write(encode_buffer.data(), rle_bytes); } }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 406ded1..f06ac30 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -77,17 +77,16 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels( // TODO: This only works with due to some RLE specifics int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) + - sizeof(uint32_t); + sizeof(int32_t); std::shared_ptr<PoolBuffer> buffer_rle = AllocateBuffer(properties_->allocator(), rle_size); level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_, - buffer_rle->mutable_data() + sizeof(uint32_t), - buffer_rle->size() - sizeof(uint32_t)); + buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t)); int encoded = level_encoder_.Encode( num_buffered_values_, reinterpret_cast<const int16_t*>(buffer->data())); DCHECK_EQ(encoded, num_buffered_values_); - reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len(); - int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t); + reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len(); + int64_t encoded_size = level_encoder_.len() + sizeof(int32_t); DCHECK(rle_size >= encoded_size); PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size)); return std::static_pointer_cast<Buffer>(buffer_rle); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/257e65b8/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index d9acf41..2f7f9d2 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -485,16 +485,17 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); column_chunk_->meta_data.__set_total_compressed_size(compressed_size); std::vector<format::Encoding::type> thrift_encodings; - thrift_encodings.push_back(ToThrift(Encoding::RLE)); if (has_dictionary) { - thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); - // add the encoding only if it is unique - if (properties_->version() == ParquetVersion::PARQUET_2_0) { - thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); + thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); + if (properties_->version() == ParquetVersion::PARQUET_1_0) { + thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); + } else { + thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); } } else { // Dictionary not enabled thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); } + thrift_encodings.push_back(ToThrift(Encoding::RLE)); // Only PLAIN encoding is supported for fallback in V1 // TODO(majetideepak): Use user specified encoding for V2 if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); }
