This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 035ce51  ARROW-3564: [C++] Fix dictionary encoding logic for Parquet 
2.0
035ce51 is described below

commit 035ce511e3c6ff065191e59e48207a28d8d0a799
Author: Hatem Helal <[email protected]>
AuthorDate: Wed Jan 16 21:46:31 2019 +0100

    ARROW-3564: [C++] Fix dictionary encoding logic for Parquet 2.0
    
    Reworked logic for encoder selection in column writer.  Added C++ tests and 
expect that this should resolve the problem observed with using dictionary 
encoding to write parquet files using pyarrow described in ARROW-3564.
    
    Author: Hatem Helal <[email protected]>
    
    Closes #3331 from hatemhelal/arrow-3564 and squashes the following commits:
    
    55bed4ca <Hatem Helal> explicitly set dictionary page size when running 
unittests using valgrind to force dictionary fallback
    ee6263e8 <Hatem Helal> speculative fix for ubuntu CI failure which doesnt 
appear to trigger dictionary fallback
    95e4de2d <Hatem Helal> Make use_dictionary const as per rdmello's advice
    40f26b68 <Hatem Helal> Don't use gmock matcher since that isn't available 
in the CI system
    389ed416 <Hatem Helal> Add use_dictionary argument to benchmark which fixes 
CI build failures
    efb3bd88 <Hatem Helal> Fix dictionary encoding logic for Parquet 2.0
---
 cpp/src/parquet/column-io-benchmark.cc |  4 +-
 cpp/src/parquet/column_writer-test.cc  | 84 +++++++++++++++++++++++-----------
 cpp/src/parquet/column_writer.cc       | 63 ++++++++++++-------------
 cpp/src/parquet/column_writer.h        |  4 +-
 4 files changed, 91 insertions(+), 64 deletions(-)

diff --git a/cpp/src/parquet/column-io-benchmark.cc 
b/cpp/src/parquet/column-io-benchmark.cc
index 8f286f4..bb056c1 100644
--- a/cpp/src/parquet/column-io-benchmark.cc
+++ b/cpp/src/parquet/column-io-benchmark.cc
@@ -35,8 +35,8 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, 
OutputStream* dst,
                                          const WriterProperties* properties) {
   std::unique_ptr<PageWriter> pager =
       PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
-  return std::unique_ptr<Int64Writer>(
-      new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, 
properties));
+  return std::unique_ptr<Int64Writer>(new Int64Writer(
+      metadata, std::move(pager), false /*use_dictionary*/, Encoding::PLAIN, 
properties));
 }
 
 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
diff --git a/cpp/src/parquet/column_writer-test.cc 
b/cpp/src/parquet/column_writer-test.cc
index 28a18b1..a7671db 100644
--- a/cpp/src/parquet/column_writer-test.cc
+++ b/cpp/src/parquet/column_writer-test.cc
@@ -43,11 +43,15 @@ const int SMALL_SIZE = 100;
 const int LARGE_SIZE = 10000;
 // Very large size to test dictionary fallback.
 const int VERY_LARGE_SIZE = 40000;
+// Reduced dictionary page size to use for testing dictionary fallback with 
valgrind
+const int64_t DICTIONARY_PAGE_SIZE = 1024;
 #else
 // Larger size to test some corner cases, only used in some specific cases.
 const int LARGE_SIZE = 100000;
 // Very large size to test dictionary fallback.
 const int VERY_LARGE_SIZE = 400000;
+// Dictionary page size to use for testing dictionary fallback
+const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024;
 #endif
 
 template <typename TestType>
@@ -79,12 +83,15 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
 
   std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
       int64_t output_size = SMALL_SIZE,
-      const ColumnProperties& column_properties = ColumnProperties()) {
+      const ColumnProperties& column_properties = ColumnProperties(),
+      const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) {
     sink_.reset(new InMemoryOutputStream());
     WriterProperties::Builder wp_builder;
+    wp_builder.version(version);
     if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
         column_properties.encoding() == Encoding::RLE_DICTIONARY) {
       wp_builder.enable_dictionary();
+      wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
     } else {
       wp_builder.disable_dictionary();
       wp_builder.encoding(column_properties.encoding());
@@ -128,6 +135,50 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
     ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
   }
 
+  void TestDictionaryFallbackEncoding(ParquetVersion::type version) {
+    this->GenerateData(VERY_LARGE_SIZE);
+    ColumnProperties column_properties;
+    column_properties.set_dictionary_enabled(true);
+
+    if (version == ParquetVersion::PARQUET_1_0) {
+      column_properties.set_encoding(Encoding::PLAIN_DICTIONARY);
+    } else {
+      column_properties.set_encoding(Encoding::RLE_DICTIONARY);
+    }
+
+    auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties, 
version);
+
+    writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
+    writer->Close();
+
+    // Read all rows so we are sure that also the non-dictionary pages are 
read correctly
+    this->SetupValuesOut(VERY_LARGE_SIZE);
+    this->ReadColumnFully();
+    ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
+    this->values_.resize(VERY_LARGE_SIZE);
+    ASSERT_EQ(this->values_, this->values_out_);
+    std::vector<Encoding::type> encodings = this->metadata_encodings();
+
+    if (this->type_num() == Type::BOOLEAN) {
+      // Dictionary encoding is not allowed for boolean type
+      // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
+      std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE});
+      ASSERT_EQ(encodings, expected);
+    } else if (version == ParquetVersion::PARQUET_1_0) {
+      // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a 
fallback case
+      // for version 1.0
+      std::vector<Encoding::type> expected(
+          {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, 
Encoding::PLAIN});
+      ASSERT_EQ(encodings, expected);
+    } else {
+      // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a 
fallback case for
+      // version 2.0
+      std::vector<Encoding::type> expected(
+          {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, 
Encoding::PLAIN});
+      ASSERT_EQ(encodings, expected);
+    }
+  }
+
   void WriteRequiredWithSettings(Encoding::type encoding, Compression::type 
compression,
                                  bool enable_dictionary, bool 
enable_statistics,
                                  int64_t num_rows) {
@@ -478,32 +529,13 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
   ASSERT_EQ(this->values_, this->values_out_);
 }
 
-// Test case for dictionary fallback encoding
-TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
-  this->GenerateData(VERY_LARGE_SIZE);
-
-  auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
-  writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_ptr_);
-  writer->Close();
+// Test cases for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
+  this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0);
+}
 
-  // Read all rows so we are sure that also the non-dictionary pages are read 
correctly
-  this->SetupValuesOut(VERY_LARGE_SIZE);
-  this->ReadColumnFully();
-  ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
-  this->values_.resize(VERY_LARGE_SIZE);
-  ASSERT_EQ(this->values_, this->values_out_);
-  std::vector<Encoding::type> encodings = this->metadata_encodings();
-  // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
-  // Dictionary encoding is not allowed for boolean type
-  // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
-  if (this->type_num() != Type::BOOLEAN) {
-    ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
-    ASSERT_EQ(Encoding::PLAIN, encodings[1]);
-    ASSERT_EQ(Encoding::RLE, encodings[2]);
-  } else {
-    ASSERT_EQ(Encoding::PLAIN, encodings[0]);
-    ASSERT_EQ(Encoding::RLE, encodings[1]);
-  }
+TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
+  this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
 }
 
 // PARQUET-719
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index dfb65f1..6996757 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -534,22 +534,16 @@ void ColumnWriter::FlushBufferedDataPages() {
 template <typename Type>
 TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* 
metadata,
                                            std::unique_ptr<PageWriter> pager,
+                                           const bool use_dictionary,
                                            Encoding::type encoding,
                                            const WriterProperties* properties)
-    : ColumnWriter(metadata, std::move(pager),
-                   (encoding == Encoding::PLAIN_DICTIONARY ||
-                    encoding == Encoding::RLE_DICTIONARY),
-                   encoding, properties) {
-  switch (encoding) {
-    case Encoding::PLAIN:
-      current_encoder_.reset(new PlainEncoder<Type>(descr_, 
properties->memory_pool()));
-      break;
-    case Encoding::PLAIN_DICTIONARY:
-    case Encoding::RLE_DICTIONARY:
-      current_encoder_.reset(new DictEncoder<Type>(descr_, 
properties->memory_pool()));
-      break;
-    default:
-      ParquetException::NYI("Selected encoding is not supported");
+    : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, 
properties) {
+  if (use_dictionary) {
+    current_encoder_.reset(new DictEncoder<Type>(descr_, 
properties->memory_pool()));
+  } else if (encoding == Encoding::PLAIN) {
+    current_encoder_.reset(new PlainEncoder<Type>(descr_, 
properties->memory_pool()));
+  } else {
+    ParquetException::NYI("Selected encoding is not supported");
   }
 
   if (properties->statistics_enabled(descr_->path()) &&
@@ -583,7 +577,7 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
   dict_encoder->WriteDict(buffer->mutable_data());
 
   DictionaryPage page(buffer, dict_encoder->num_entries(),
-                      properties_->dictionary_index_encoding());
+                      properties_->dictionary_page_encoding());
   total_bytes_written_ += pager_->WriteDictionaryPage(page);
 }
 
@@ -616,36 +610,37 @@ std::shared_ptr<ColumnWriter> 
ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
                                                  std::unique_ptr<PageWriter> 
pager,
                                                  const WriterProperties* 
properties) {
   const ColumnDescriptor* descr = metadata->descr();
+  const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
+                              descr->physical_type() != Type::BOOLEAN;
   Encoding::type encoding = properties->encoding(descr->path());
-  if (properties->dictionary_enabled(descr->path()) &&
-      descr->physical_type() != Type::BOOLEAN) {
-    encoding = properties->dictionary_page_encoding();
+  if (use_dictionary) {
+    encoding = properties->dictionary_index_encoding();
   }
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolWriter>(metadata, std::move(pager), encoding,
-                                          properties);
+      return std::make_shared<BoolWriter>(metadata, std::move(pager), 
use_dictionary,
+                                          encoding, properties);
     case Type::INT32:
-      return std::make_shared<Int32Writer>(metadata, std::move(pager), 
encoding,
-                                           properties);
+      return std::make_shared<Int32Writer>(metadata, std::move(pager), 
use_dictionary,
+                                           encoding, properties);
     case Type::INT64:
-      return std::make_shared<Int64Writer>(metadata, std::move(pager), 
encoding,
-                                           properties);
+      return std::make_shared<Int64Writer>(metadata, std::move(pager), 
use_dictionary,
+                                           encoding, properties);
     case Type::INT96:
-      return std::make_shared<Int96Writer>(metadata, std::move(pager), 
encoding,
-                                           properties);
+      return std::make_shared<Int96Writer>(metadata, std::move(pager), 
use_dictionary,
+                                           encoding, properties);
     case Type::FLOAT:
-      return std::make_shared<FloatWriter>(metadata, std::move(pager), 
encoding,
-                                           properties);
+      return std::make_shared<FloatWriter>(metadata, std::move(pager), 
use_dictionary,
+                                           encoding, properties);
     case Type::DOUBLE:
-      return std::make_shared<DoubleWriter>(metadata, std::move(pager), 
encoding,
-                                            properties);
+      return std::make_shared<DoubleWriter>(metadata, std::move(pager), 
use_dictionary,
+                                            encoding, properties);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), 
encoding,
-                                               properties);
+      return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), 
use_dictionary,
+                                               encoding, properties);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayWriter>(metadata, 
std::move(pager),
-                                                       encoding, properties);
+      return std::make_shared<FixedLenByteArrayWriter>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
     default:
       ParquetException::NYI("type reader not implemented");
   }
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index e665ca7..71e01f8 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -235,8 +235,8 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : 
public ColumnWriter {
   typedef typename DType::c_type T;
 
   TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                    std::unique_ptr<PageWriter> pager, Encoding::type encoding,
-                    const WriterProperties* properties);
+                    std::unique_ptr<PageWriter> pager, const bool 
use_dictionary,
+                    Encoding::type encoding, const WriterProperties* 
properties);
 
   // Write a batch of repetition levels, definition levels, and values to the
   // column.

Reply via email to