This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 035ce51 ARROW-3564: [C++] Fix dictionary encoding logic for Parquet
2.0
035ce51 is described below
commit 035ce511e3c6ff065191e59e48207a28d8d0a799
Author: Hatem Helal <[email protected]>
AuthorDate: Wed Jan 16 21:46:31 2019 +0100
ARROW-3564: [C++] Fix dictionary encoding logic for Parquet 2.0
Reworked logic for encoder selection in column writer. Added C++ tests and
expect that this should resolve the problem observed with using dictionary
encoding to write parquet files using pyarrow described in ARROW-3564.
Author: Hatem Helal <[email protected]>
Closes #3331 from hatemhelal/arrow-3564 and squashes the following commits:
55bed4ca <Hatem Helal> explicitly set dictionary page size when running
unittests using valgrind to force dictionary fallback
ee6263e8 <Hatem Helal> speculative fix for ubuntu CI failure which doesnt
appear to trigger dictionary fallback
95e4de2d <Hatem Helal> Make use_dictionary const as per rdmello's advice
40f26b68 <Hatem Helal> Don't use gmock matcher since that isn't available
in the CI system
389ed416 <Hatem Helal> Add use_dictionary argument to benchmark which fixes
CI build failures
efb3bd88 <Hatem Helal> Fix dictionary encoding logic for Parquet 2.0
---
cpp/src/parquet/column-io-benchmark.cc | 4 +-
cpp/src/parquet/column_writer-test.cc | 84 +++++++++++++++++++++++-----------
cpp/src/parquet/column_writer.cc | 63 ++++++++++++-------------
cpp/src/parquet/column_writer.h | 4 +-
4 files changed, 91 insertions(+), 64 deletions(-)
diff --git a/cpp/src/parquet/column-io-benchmark.cc
b/cpp/src/parquet/column-io-benchmark.cc
index 8f286f4..bb056c1 100644
--- a/cpp/src/parquet/column-io-benchmark.cc
+++ b/cpp/src/parquet/column-io-benchmark.cc
@@ -35,8 +35,8 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size,
OutputStream* dst,
const WriterProperties* properties) {
std::unique_ptr<PageWriter> pager =
PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
- return std::unique_ptr<Int64Writer>(
- new Int64Writer(metadata, std::move(pager), Encoding::PLAIN,
properties));
+ return std::unique_ptr<Int64Writer>(new Int64Writer(
+ metadata, std::move(pager), false /*use_dictionary*/, Encoding::PLAIN,
properties));
}
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
diff --git a/cpp/src/parquet/column_writer-test.cc
b/cpp/src/parquet/column_writer-test.cc
index 28a18b1..a7671db 100644
--- a/cpp/src/parquet/column_writer-test.cc
+++ b/cpp/src/parquet/column_writer-test.cc
@@ -43,11 +43,15 @@ const int SMALL_SIZE = 100;
const int LARGE_SIZE = 10000;
// Very large size to test dictionary fallback.
const int VERY_LARGE_SIZE = 40000;
+// Reduced dictionary page size to use for testing dictionary fallback with
valgrind
+const int64_t DICTIONARY_PAGE_SIZE = 1024;
#else
// Larger size to test some corner cases, only used in some specific cases.
const int LARGE_SIZE = 100000;
// Very large size to test dictionary fallback.
const int VERY_LARGE_SIZE = 400000;
+// Dictionary page size to use for testing dictionary fallback
+const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024;
#endif
template <typename TestType>
@@ -79,12 +83,15 @@ class TestPrimitiveWriter : public
PrimitiveTypedTest<TestType> {
std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE,
- const ColumnProperties& column_properties = ColumnProperties()) {
+ const ColumnProperties& column_properties = ColumnProperties(),
+ const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) {
sink_.reset(new InMemoryOutputStream());
WriterProperties::Builder wp_builder;
+ wp_builder.version(version);
if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
column_properties.encoding() == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
+ wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
} else {
wp_builder.disable_dictionary();
wp_builder.encoding(column_properties.encoding());
@@ -128,6 +135,50 @@ class TestPrimitiveWriter : public
PrimitiveTypedTest<TestType> {
ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
}
+ void TestDictionaryFallbackEncoding(ParquetVersion::type version) {
+ this->GenerateData(VERY_LARGE_SIZE);
+ ColumnProperties column_properties;
+ column_properties.set_dictionary_enabled(true);
+
+ if (version == ParquetVersion::PARQUET_1_0) {
+ column_properties.set_encoding(Encoding::PLAIN_DICTIONARY);
+ } else {
+ column_properties.set_encoding(Encoding::RLE_DICTIONARY);
+ }
+
+ auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties,
version);
+
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr,
this->values_ptr_);
+ writer->Close();
+
+ // Read all rows so we are sure that also the non-dictionary pages are
read correctly
+ this->SetupValuesOut(VERY_LARGE_SIZE);
+ this->ReadColumnFully();
+ ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
+ this->values_.resize(VERY_LARGE_SIZE);
+ ASSERT_EQ(this->values_, this->values_out_);
+ std::vector<Encoding::type> encodings = this->metadata_encodings();
+
+ if (this->type_num() == Type::BOOLEAN) {
+ // Dictionary encoding is not allowed for boolean type
+ // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
+ std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE});
+ ASSERT_EQ(encodings, expected);
+ } else if (version == ParquetVersion::PARQUET_1_0) {
+ // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a
fallback case
+ // for version 1.0
+ std::vector<Encoding::type> expected(
+ {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE,
Encoding::PLAIN});
+ ASSERT_EQ(encodings, expected);
+ } else {
+ // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a
fallback case for
+ // version 2.0
+ std::vector<Encoding::type> expected(
+ {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE,
Encoding::PLAIN});
+ ASSERT_EQ(encodings, expected);
+ }
+ }
+
void WriteRequiredWithSettings(Encoding::type encoding, Compression::type
compression,
bool enable_dictionary, bool
enable_statistics,
int64_t num_rows) {
@@ -478,32 +529,13 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
ASSERT_EQ(this->values_, this->values_out_);
}
-// Test case for dictionary fallback encoding
-TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
- this->GenerateData(VERY_LARGE_SIZE);
-
- auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
- writer->WriteBatch(this->values_.size(), nullptr, nullptr,
this->values_ptr_);
- writer->Close();
+// Test cases for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
+ this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0);
+}
- // Read all rows so we are sure that also the non-dictionary pages are read
correctly
- this->SetupValuesOut(VERY_LARGE_SIZE);
- this->ReadColumnFully();
- ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
- this->values_.resize(VERY_LARGE_SIZE);
- ASSERT_EQ(this->values_, this->values_out_);
- std::vector<Encoding::type> encodings = this->metadata_encodings();
- // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
- // Dictionary encoding is not allowed for boolean type
- // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
- if (this->type_num() != Type::BOOLEAN) {
- ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
- ASSERT_EQ(Encoding::PLAIN, encodings[1]);
- ASSERT_EQ(Encoding::RLE, encodings[2]);
- } else {
- ASSERT_EQ(Encoding::PLAIN, encodings[0]);
- ASSERT_EQ(Encoding::RLE, encodings[1]);
- }
+TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
+ this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
}
// PARQUET-719
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index dfb65f1..6996757 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -534,22 +534,16 @@ void ColumnWriter::FlushBufferedDataPages() {
template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder*
metadata,
std::unique_ptr<PageWriter> pager,
+ const bool use_dictionary,
Encoding::type encoding,
const WriterProperties* properties)
- : ColumnWriter(metadata, std::move(pager),
- (encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
- encoding, properties) {
- switch (encoding) {
- case Encoding::PLAIN:
- current_encoder_.reset(new PlainEncoder<Type>(descr_,
properties->memory_pool()));
- break;
- case Encoding::PLAIN_DICTIONARY:
- case Encoding::RLE_DICTIONARY:
- current_encoder_.reset(new DictEncoder<Type>(descr_,
properties->memory_pool()));
- break;
- default:
- ParquetException::NYI("Selected encoding is not supported");
+ : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding,
properties) {
+ if (use_dictionary) {
+ current_encoder_.reset(new DictEncoder<Type>(descr_,
properties->memory_pool()));
+ } else if (encoding == Encoding::PLAIN) {
+ current_encoder_.reset(new PlainEncoder<Type>(descr_,
properties->memory_pool()));
+ } else {
+ ParquetException::NYI("Selected encoding is not supported");
}
if (properties->statistics_enabled(descr_->path()) &&
@@ -583,7 +577,7 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
dict_encoder->WriteDict(buffer->mutable_data());
DictionaryPage page(buffer, dict_encoder->num_entries(),
- properties_->dictionary_index_encoding());
+ properties_->dictionary_page_encoding());
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
@@ -616,36 +610,37 @@ std::shared_ptr<ColumnWriter>
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
std::unique_ptr<PageWriter>
pager,
const WriterProperties*
properties) {
const ColumnDescriptor* descr = metadata->descr();
+ const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
+ descr->physical_type() != Type::BOOLEAN;
Encoding::type encoding = properties->encoding(descr->path());
- if (properties->dictionary_enabled(descr->path()) &&
- descr->physical_type() != Type::BOOLEAN) {
- encoding = properties->dictionary_page_encoding();
+ if (use_dictionary) {
+ encoding = properties->dictionary_index_encoding();
}
switch (descr->physical_type()) {
case Type::BOOLEAN:
- return std::make_shared<BoolWriter>(metadata, std::move(pager), encoding,
- properties);
+ return std::make_shared<BoolWriter>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::INT32:
- return std::make_shared<Int32Writer>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<Int32Writer>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::INT64:
- return std::make_shared<Int64Writer>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<Int64Writer>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::INT96:
- return std::make_shared<Int96Writer>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<Int96Writer>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::FLOAT:
- return std::make_shared<FloatWriter>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<FloatWriter>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::DOUBLE:
- return std::make_shared<DoubleWriter>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<DoubleWriter>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayWriter>(metadata, std::move(pager),
encoding,
- properties);
+ return std::make_shared<ByteArrayWriter>(metadata, std::move(pager),
use_dictionary,
+ encoding, properties);
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayWriter>(metadata,
std::move(pager),
- encoding, properties);
+ return std::make_shared<FixedLenByteArrayWriter>(
+ metadata, std::move(pager), use_dictionary, encoding, properties);
default:
ParquetException::NYI("type reader not implemented");
}
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index e665ca7..71e01f8 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -235,8 +235,8 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter :
public ColumnWriter {
typedef typename DType::c_type T;
TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, Encoding::type encoding,
- const WriterProperties* properties);
+ std::unique_ptr<PageWriter> pager, const bool
use_dictionary,
+ Encoding::type encoding, const WriterProperties*
properties);
// Write a batch of repetition levels, definition levels, and values to the
// column.