This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new d3a576d03a GH-37002: [C++][Parquet] Add api to get RecordReader from 
RowGroupReader (#37003)
d3a576d03a is described below

commit d3a576d03ad943773d9c55974f9c797a5042b4a8
Author: Fatemah Panahi <[email protected]>
AuthorDate: Mon Oct 16 18:16:52 2023 -0700

    GH-37002: [C++][Parquet] Add api to get RecordReader from RowGroupReader 
(#37003)
    
    Currently we only can get a ColumnReader for a column from RowGroupReader. 
We need an API to return a RecordReader for the column.
    
    Moved ComputeLevelInfo from column_writer to level_conversion so that it 
can be shared between column_writer and file_reader.
    * Closes: #37002
    
    Lead-authored-by: Fatemah Panahi <[email protected]>
    Co-authored-by: Fatemah Panahi <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 cpp/src/parquet/column_writer.cc   | 19 +------------------
 cpp/src/parquet/file_reader.cc     | 20 ++++++++++++++++++++
 cpp/src/parquet/file_reader.h      |  8 ++++++++
 cpp/src/parquet/level_conversion.h | 18 ++++++++++++++++++
 cpp/src/parquet/properties.h       |  6 ++++++
 cpp/src/parquet/reader_test.cc     | 36 ++++++++++++++++++++++++++++++++++++
 6 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 88829ef5d9..72e984d773 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -140,23 +140,6 @@ struct ValueBufferSlicer {
   MemoryPool* pool_;
 };
 
-internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
-  internal::LevelInfo level_info;
-  level_info.def_level = descr->max_definition_level();
-  level_info.rep_level = descr->max_repetition_level();
-
-  int16_t min_spaced_def_level = descr->max_definition_level();
-  const ::parquet::schema::Node* node = descr->schema_node().get();
-  while (node != nullptr && !node->is_repeated()) {
-    if (node->is_optional()) {
-      min_spaced_def_level--;
-    }
-    node = node->parent();
-  }
-  level_info.repeated_ancestor_def_level = min_spaced_def_level;
-  return level_info;
-}
-
 template <class T>
 inline const T* AddIfNotNull(const T* base, int64_t offset) {
   if (base != nullptr) {
@@ -738,7 +721,7 @@ class ColumnWriterImpl {
                    Encoding::type encoding, const WriterProperties* properties)
       : metadata_(metadata),
         descr_(metadata->descr()),
-        level_info_(ComputeLevelInfo(metadata->descr())),
+        level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())),
         pager_(std::move(pager)),
         has_dictionary_(use_dictionary),
         encoding_(encoding),
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 7318b5640c..1d972b78fb 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -82,6 +82,26 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
       const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
 }
 
+std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
+  if (i >= metadata()->num_columns()) {
+    std::stringstream ss;
+    ss << "Trying to read column index " << i << " but row group metadata has 
only "
+       << metadata()->num_columns() << " columns";
+    throw ParquetException(ss.str());
+  }
+  const ColumnDescriptor* descr = metadata()->schema()->Column(i);
+
+  std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+
+  internal::LevelInfo level_info = 
internal::LevelInfo::ComputeLevelInfo(descr);
+
+  auto reader = internal::RecordReader::Make(
+      descr, level_info, contents_->properties()->memory_pool(),
+      /* read_dictionary = */ false, 
contents_->properties()->read_dense_for_nullable());
+  reader->SetPageReader(std::move(page_reader));
+  return reader;
+}
+
 std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
     int i, ExposedEncoding encoding_to_expose) {
   std::shared_ptr<ColumnReader> reader = Column(i);
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index 13fcd196c4..da85b73fc2 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -37,6 +37,10 @@ class BloomFilterReader;
 class PageReader;
 class RowGroupMetaData;
 
+namespace internal {
+class RecordReader;
+}
+
 class PARQUET_EXPORT RowGroupReader {
  public:
   // Forward declare a virtual class 'Contents' to aid dependency injection 
and more
@@ -58,6 +62,10 @@ class PARQUET_EXPORT RowGroupReader {
   // column. Ownership is shared with the RowGroupReader.
   std::shared_ptr<ColumnReader> Column(int i);
 
+  // EXPERIMENTAL: Construct a RecordReader for the indicated column of the 
row group.
+  // Ownership is shared with the RowGroupReader.
+  std::shared_ptr<internal::RecordReader> RecordReader(int i);
+
   // Construct a ColumnReader, trying to enable exposed encoding.
   //
   // For dictionary encoding, currently we only support column chunks that are 
fully
diff --git a/cpp/src/parquet/level_conversion.h 
b/cpp/src/parquet/level_conversion.h
index 3f56b2de36..2c6f628319 100644
--- a/cpp/src/parquet/level_conversion.h
+++ b/cpp/src/parquet/level_conversion.h
@@ -121,6 +121,24 @@ struct PARQUET_EXPORT LevelInfo {
     return last_repeated_ancestor;
   }
 
+  // Calculates and returns LevelInfo for a column descriptor.
+  static LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
+    LevelInfo level_info;
+    level_info.def_level = descr->max_definition_level();
+    level_info.rep_level = descr->max_repetition_level();
+
+    int16_t min_spaced_def_level = descr->max_definition_level();
+    const ::parquet::schema::Node* node = descr->schema_node().get();
+    while (node && !node->is_repeated()) {
+      if (node->is_optional()) {
+        min_spaced_def_level--;
+      }
+      node = node->parent();
+    }
+    level_info.repeated_ancestor_def_level = min_spaced_def_level;
+    return level_info;
+  }
+
   friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) {
     // This print method is to silence valgrind issues.  What's printed
     // is not important because all asserts happen directly on
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 6d3d5aa4f4..4d3acb491e 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -79,6 +79,10 @@ class PARQUET_EXPORT ReaderProperties {
   /// Disable buffered stream reading.
   void disable_buffered_stream() { buffered_stream_enabled_ = false; }
 
+  bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
+  void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; }
+  void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; }
+
   /// Return the size of the buffered stream buffer.
   int64_t buffer_size() const { return buffer_size_; }
   /// Set the size of the buffered stream buffer in bytes.
@@ -123,6 +127,8 @@ class PARQUET_EXPORT ReaderProperties {
   int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
   bool buffered_stream_enabled_ = false;
   bool page_checksum_verification_ = false;
+  // Used with a RecordReader.
+  bool read_dense_for_nullable_ = false;
   std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
 };
 
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 0a73002846..8fe12d3de0 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -502,6 +502,42 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
   ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
 }
 
+// Tests that read_dense_for_nullable is passed down to the record
+// reader. The functionality of read_dense_for_nullable is tested
+// elsewhere.
+TEST(TestFileReader, RecordReaderReadDenseForNullable) {
+  // We test the default which is false, and also test enabling and disabling
+  // read_dense_for_nullable.
+  std::vector<ReaderProperties> reader_properties(3);
+  reader_properties[1].enable_read_dense_for_nullable();
+  reader_properties[2].disable_read_dense_for_nullable();
+  for (const auto& reader_props : reader_properties) {
+    std::unique_ptr<ParquetFileReader> file_reader = 
ParquetFileReader::OpenFile(
+        alltypes_plain(), /* memory_map = */ false, reader_props);
+    std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
+    std::shared_ptr<internal::RecordReader> col_record_reader = 
group->RecordReader(0);
+    ASSERT_EQ(reader_props.read_dense_for_nullable(),
+              col_record_reader->read_dense_for_nullable());
+  }
+}
+
+// Tests getting a record reader from a row group reader.
+TEST(TestFileReader, GetRecordReader) {
+  ReaderProperties reader_props;
+  std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
+      alltypes_plain(), /* memory_map = */ false, reader_props);
+  std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
+
+  std::shared_ptr<internal::RecordReader> col_record_reader_ = 
group->RecordReader(0);
+
+  ASSERT_TRUE(col_record_reader_->HasMoreData());
+  auto records_read = col_record_reader_->ReadRecords(4);
+  ASSERT_EQ(records_read, 4);
+  ASSERT_EQ(4, col_record_reader_->values_written());
+  ASSERT_EQ(4, col_record_reader_->levels_position());
+  ASSERT_EQ(8, col_record_reader_->levels_written());
+}
+
 class TestLocalFile : public ::testing::Test {
  public:
   void SetUp() {

Reply via email to