This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new bec0385679 PARQUET-2411: [C++][Parquet] Allow reading dictionary 
without reading data via ByteArrayDictionaryRecordReader (#39153)
bec0385679 is described below

commit bec03856799a69bf0e6d4419ab7bc565afd070fe
Author: Jinpeng <[email protected]>
AuthorDate: Thu Jan 4 21:41:01 2024 -0500

    PARQUET-2411: [C++][Parquet] Allow reading dictionary without reading data 
via ByteArrayDictionaryRecordReader (#39153)
    
    
    ### Rationale for this change
    This proposes an API to read only the dictionary  from 
ByteArrayDictionaryRecordReader, enabling possible uses cases where the caller 
just want to check the dictionary content.
    
    ### What changes are included in this PR?
    
    New APIs to enable reading dictionary with RecordReader.
    
    ### Are these changes tested?
    Unit tests.
    
    ### Are there any user-facing changes?
    
    New APIs without breaking existing workflow.
    
    Authored-by: jp0317 <[email protected]>
    Signed-off-by: mwish <[email protected]>
---
 cpp/src/parquet/column_reader.cc |  20 ++++++
 cpp/src/parquet/column_reader.h  |  10 +++
 cpp/src/parquet/file_reader.cc   |  79 ++++++++++++++----------
 cpp/src/parquet/file_reader.h    |  15 ++++-
 cpp/src/parquet/reader_test.cc   | 127 +++++++++++++++++++++++++++++++++++++++
 5 files changed, 217 insertions(+), 34 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index a49e58afbd..99978e283b 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1370,6 +1370,26 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
     return bytes_for_values;
   }
 
+  const void* ReadDictionary(int32_t* dictionary_length) override {
+    if (this->current_decoder_ == nullptr && !this->HasNextInternal()) {
+      dictionary_length = 0;
+      return nullptr;
+    }
+    // Verify the current data page is dictionary encoded. The 
current_encoding_ should
+    // have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY 
or
+    // PLAIN_DICTIONARY.
+    if (this->current_encoding_ != Encoding::RLE_DICTIONARY) {
+      std::stringstream ss;
+      ss << "Data page is not dictionary encoded. Encoding: "
+         << EncodingToString(this->current_encoding_);
+      throw ParquetException(ss.str());
+    }
+    auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
+    const T* dictionary = nullptr;
+    decoder->GetDictionary(&dictionary, dictionary_length);
+    return reinterpret_cast<const void*>(dictionary);
+  }
+
   int64_t ReadRecords(int64_t num_records) override {
     if (num_records == 0) return 0;
     // Delimit records, then read values at the end
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 334b8bcffe..086f6c0e55 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -368,6 +368,16 @@ class PARQUET_EXPORT RecordReader {
 
   virtual void DebugPrintState() = 0;
 
+  /// \brief Returns the dictionary owned by the current decoder. Throws an
+  /// exception if the current decoder is not for dictionary encoding. The 
caller is
+  /// responsible for casting the returned pointer to proper type depending on 
the
+  /// column's physical type. An example:
+  ///   const ByteArray* dict = reinterpret_cast<const 
ByteArray*>(ReadDictionary(&len));
+  /// or:
+  ///   const float* dict = reinterpret_cast<const 
float*>(ReadDictionary(&len));
+  /// \param[out] dictionary_length The number of dictionary entries.
+  virtual const void* ReadDictionary(int32_t* dictionary_length) = 0;
+
   /// \brief Decoded definition levels
   int16_t* def_levels() const {
     return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 1d972b78fb..b3dd1d6054 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -54,6 +54,36 @@ using arrow::internal::AddWithOverflow;
 
 namespace parquet {
 
+namespace {
+bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) {
+  // Check the encoding_stats to see if all data pages are dictionary encoded.
+  const std::vector<PageEncodingStats>& encoding_stats = col.encoding_stats();
+  if (encoding_stats.empty()) {
+    // Some parquet files may have empty encoding_stats. In this case we are
+    // not sure whether all data pages are dictionary encoded.
+    return false;
+  }
+  // The 1st page should be the dictionary page.
+  if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
+      (encoding_stats[0].encoding != Encoding::PLAIN &&
+       encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
+    return false;
+  }
+  // The following pages should be dictionary encoded data pages.
+  for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
+    if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
+         encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
+        (encoding_stats[idx].page_type != PageType::DATA_PAGE &&
+         encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
+      // Return false if any following page is not a dictionary encoded data
+      // page.
+      return false;
+    }
+  }
+  return true;
+}
+}  // namespace
+
 // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
 static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
 static constexpr uint32_t kFooterSize = 8;
@@ -82,7 +112,8 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
       const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
 }
 
-std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
+std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(
+    int i, bool read_dictionary) {
   if (i >= metadata()->num_columns()) {
     std::stringstream ss;
     ss << "Trying to read column index " << i << " but row group metadata has 
only "
@@ -96,8 +127,8 @@ std::shared_ptr<internal::RecordReader> 
RowGroupReader::RecordReader(int i) {
   internal::LevelInfo level_info = 
internal::LevelInfo::ComputeLevelInfo(descr);
 
   auto reader = internal::RecordReader::Make(
-      descr, level_info, contents_->properties()->memory_pool(),
-      /* read_dictionary = */ false, 
contents_->properties()->read_dense_for_nullable());
+      descr, level_info, contents_->properties()->memory_pool(), 
read_dictionary,
+      contents_->properties()->read_dense_for_nullable());
   reader->SetPageReader(std::move(page_reader));
   return reader;
 }
@@ -106,41 +137,23 @@ std::shared_ptr<ColumnReader> 
RowGroupReader::ColumnWithExposeEncoding(
     int i, ExposedEncoding encoding_to_expose) {
   std::shared_ptr<ColumnReader> reader = Column(i);
 
-  if (encoding_to_expose == ExposedEncoding::DICTIONARY) {
-    // Check the encoding_stats to see if all data pages are dictionary 
encoded.
-    std::unique_ptr<ColumnChunkMetaData> col = metadata()->ColumnChunk(i);
-    const std::vector<PageEncodingStats>& encoding_stats = 
col->encoding_stats();
-    if (encoding_stats.empty()) {
-      // Some parquet files may have empty encoding_stats. In this case we are
-      // not sure whether all data pages are dictionary encoded. So we do not
-      // enable exposing dictionary.
-      return reader;
-    }
-    // The 1st page should be the dictionary page.
-    if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
-        (encoding_stats[0].encoding != Encoding::PLAIN &&
-         encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
-      return reader;
-    }
-    // The following pages should be dictionary encoded data pages.
-    for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
-      if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
-           encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
-          (encoding_stats[idx].page_type != PageType::DATA_PAGE &&
-           encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
-        return reader;
-      }
-    }
-  } else {
-    // Exposing other encodings are not supported for now.
-    return reader;
+  if (encoding_to_expose == ExposedEncoding::DICTIONARY &&
+      IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) {
+    // Set exposed encoding.
+    reader->SetExposedEncoding(encoding_to_expose);
   }
 
-  // Set exposed encoding.
-  reader->SetExposedEncoding(encoding_to_expose);
   return reader;
 }
 
+std::shared_ptr<internal::RecordReader> 
RowGroupReader::RecordReaderWithExposeEncoding(
+    int i, ExposedEncoding encoding_to_expose) {
+  return RecordReader(
+      i,
+      /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY &&
+          IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i)));
+}
+
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
   if (i >= metadata()->num_columns()) {
     std::stringstream ss;
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index da85b73fc2..b59b59f95c 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -64,7 +64,8 @@ class PARQUET_EXPORT RowGroupReader {
 
   // EXPERIMENTAL: Construct a RecordReader for the indicated column of the 
row group.
   // Ownership is shared with the RowGroupReader.
-  std::shared_ptr<internal::RecordReader> RecordReader(int i);
+  std::shared_ptr<internal::RecordReader> RecordReader(int i,
+                                                       bool read_dictionary = 
false);
 
   // Construct a ColumnReader, trying to enable exposed encoding.
   //
@@ -80,6 +81,18 @@ class PARQUET_EXPORT RowGroupReader {
   std::shared_ptr<ColumnReader> ColumnWithExposeEncoding(
       int i, ExposedEncoding encoding_to_expose);
 
+  // Construct a RecordReader, trying to enable exposed encoding.
+  //
+  // For dictionary encoding, currently we only support column chunks that are
+  // fully dictionary encoded byte arrays. The caller should verify if the 
reader can read
+  // and expose the dictionary by checking the reader's read_dictionary(). If 
a column
+  // chunk uses dictionary encoding but then falls back to plain encoding, the 
returned
+  // reader will read decoded data without exposing the dictionary.
+  //
+  // \note API EXPERIMENTAL
+  std::shared_ptr<internal::RecordReader> RecordReaderWithExposeEncoding(
+      int i, ExposedEncoding encoding_to_expose);
+
   std::unique_ptr<PageReader> GetColumnPageReader(int i);
 
  private:
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 5223158e5f..2c2b62f5d1 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -542,6 +542,83 @@ TEST(TestFileReader, GetRecordReader) {
   ASSERT_EQ(8, col_record_reader_->levels_written());
 }
 
+TEST(TestFileReader, RecordReaderWithExposingDictionary) {
+  const int num_rows = 1000;
+
+  // Make schema
+  schema::NodeVector fields;
+  fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, 
Type::BYTE_ARRAY,
+                                       ConvertedType::NONE));
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  // Write small batches and small data pages
+  std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
+                                                       .write_batch_size(64)
+                                                       ->data_pagesize(128)
+                                                       ->enable_dictionary()
+                                                       ->build();
+
+  ASSERT_OK_AND_ASSIGN(auto out_file, 
::arrow::io::BufferOutputStream::Create());
+  std::shared_ptr<ParquetFileWriter> file_writer =
+      ParquetFileWriter::Open(out_file, schema, writer_props);
+
+  RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+  // write one column
+  ::arrow::random::RandomArrayGenerator rag(0);
+  ByteArrayWriter* writer = 
static_cast<ByteArrayWriter*>(rg_writer->NextColumn());
+  std::vector<std::string> raw_unique_data = {"a", "bc", "defg"};
+  std::vector<ByteArray> col_typed;
+  for (int i = 0; i < num_rows; i++) {
+    std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()];
+    col_typed.emplace_back(chosed_data);
+  }
+  writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data());
+  rg_writer->Close();
+  file_writer->Close();
+
+  // Open the reader
+  ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
+  auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);
+
+  ReaderProperties reader_props;
+  reader_props.enable_buffered_stream();
+  reader_props.set_buffer_size(64);
+  std::unique_ptr<ParquetFileReader> file_reader =
+      ParquetFileReader::Open(in_file, reader_props);
+
+  auto row_group = file_reader->RowGroup(0);
+  auto record_reader = 
std::dynamic_pointer_cast<internal::DictionaryRecordReader>(
+      row_group->RecordReaderWithExposeEncoding(0, 
ExposedEncoding::DICTIONARY));
+  ASSERT_NE(record_reader, nullptr);
+  ASSERT_TRUE(record_reader->read_dictionary());
+
+  int32_t dict_len = 0;
+  auto dict =
+      reinterpret_cast<const 
ByteArray*>(record_reader->ReadDictionary(&dict_len));
+  ASSERT_NE(dict, nullptr);
+  ASSERT_EQ(dict_len, raw_unique_data.size());
+  ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows);
+  std::shared_ptr<::arrow::ChunkedArray> result_array = 
record_reader->GetResult();
+  ASSERT_EQ(result_array->num_chunks(), 1);
+  const std::shared_ptr<::arrow::Array> chunk = result_array->chunk(0);
+  auto dictionary_array = 
std::dynamic_pointer_cast<::arrow::DictionaryArray>(chunk);
+  const int32_t* indices =
+      
(std::dynamic_pointer_cast<::arrow::Int32Array>(dictionary_array->indices()))
+          ->raw_values();
+
+  // Verify values based on the dictionary from ReadDictionary().
+  int64_t indices_read = chunk->length();
+  ASSERT_EQ(indices_read, num_rows);
+  for (int i = 0; i < indices_read; ++i) {
+    ASSERT_LT(indices[i], dict_len);
+    ASSERT_EQ(std::string_view(reinterpret_cast<const char* 
const>(dict[indices[i]].ptr),
+                               dict[indices[i]].len),
+              col_typed[i]);
+  }
+}
+
 class TestLocalFile : public ::testing::Test {
  public:
   void SetUp() {
@@ -1064,6 +1141,56 @@ TEST(TestFileReader, BufferedReadsWithDictionary) {
   }
 }
 
+TEST(TestFileReader, PartiallyDictionaryEncodingNotExposed) {
+  const int num_rows = 1000;
+
+  // Make schema
+  schema::NodeVector fields;
+  fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED, 
Type::DOUBLE,
+                                       ConvertedType::NONE));
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  // Write small batches and small data pages. Explicitly set the dictionary 
page size
+  // limit such that the column chunk will not be fully dictionary encoded.
+  std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
+                                                       .write_batch_size(64)
+                                                       ->data_pagesize(128)
+                                                       ->enable_dictionary()
+                                                       
->dictionary_pagesize_limit(4)
+                                                       ->build();
+
+  ASSERT_OK_AND_ASSIGN(auto out_file, 
::arrow::io::BufferOutputStream::Create());
+  std::shared_ptr<ParquetFileWriter> file_writer =
+      ParquetFileWriter::Open(out_file, schema, writer_props);
+
+  RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+  // write one column
+  ::arrow::random::RandomArrayGenerator rag(0);
+  DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn());
+  std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100);
+  const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col);
+  writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values());
+  rg_writer->Close();
+  file_writer->Close();
+
+  // Open the reader
+  ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
+  auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);
+
+  ReaderProperties reader_props;
+  reader_props.enable_buffered_stream();
+  reader_props.set_buffer_size(64);
+  std::unique_ptr<ParquetFileReader> file_reader =
+      ParquetFileReader::Open(in_file, reader_props);
+
+  auto row_group = file_reader->RowGroup(0);
+  auto col_reader = std::static_pointer_cast<DoubleReader>(
+      row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY));
+  EXPECT_NE(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY);
+}
+
 TEST(TestFileReader, BufferedReads) {
   // PARQUET-1636: Buffered reads were broken before introduction of
   // RandomAccessFile::GetStream

Reply via email to