This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cdbdac413 GH-44808: [C++][Parquet] Add `arrow::Result` version of 
`parquet::arrow::FileReader::GetRecordBatchReader()` (#44809)
0cdbdac413 is described below

commit 0cdbdac413fa1e7acaf0362eb8ca8fd1911d2a9e
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Nov 23 07:11:04 2024 +0900

    GH-44808: [C++][Parquet] Add `arrow::Result` version of 
`parquet::arrow::FileReader::GetRecordBatchReader()` (#44809)
    
    ### Rationale for this change
    
    We're migrating `arrow::Status` + output variable API to `arrow::Result` 
API.
    
    ### What changes are included in this PR?
    
    * Add `arrow::Result<std::unique_ptr<arrow::RecordBatchReader 
parquet::arrow::FileReader::GetRecordBatchReader()`
    * Deprecate `arrow::Status 
parquet::arrow::FileReadeder::GetRecordBatchReader()`
    * Use the added `arrow::Result` version in our code base
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * GitHub Issue: #44808
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 41 ++++++++--------
 cpp/src/parquet/arrow/reader.cc                   | 57 ++++++++++++++---------
 cpp/src/parquet/arrow/reader.h                    | 41 ++++++++++++++--
 3 files changed, 90 insertions(+), 49 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index b5fd2bc255..78d272ff24 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -479,14 +479,16 @@ void DoRoundTripWithBatches(
                          ->Build(&reader));
   std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
   if (column_subset.size() > 0) {
-    ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
-        Iota(reader->parquet_reader()->metadata()->num_row_groups()), 
column_subset,
-        &batch_reader));
+    ASSERT_OK_AND_ASSIGN(
+        batch_reader,
+        reader->GetRecordBatchReader(
+            Iota(reader->parquet_reader()->metadata()->num_row_groups()), 
column_subset));
   } else {
     // Read everything
 
-    ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
-        Iota(reader->parquet_reader()->metadata()->num_row_groups()), 
&batch_reader));
+    ASSERT_OK_AND_ASSIGN(
+        batch_reader, reader->GetRecordBatchReader(
+                          
Iota(reader->parquet_reader()->metadata()->num_row_groups())));
   }
   ASSERT_OK_AND_ASSIGN(*out, Table::FromRecordBatchReader(batch_reader.get()));
 }
@@ -2385,8 +2387,7 @@ void TestGetRecordBatchReader(
   ASSERT_OK(builder.properties(properties)->Build(&reader));
 
   // Read the whole file, one batch at a time.
-  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
-  ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
+  ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader({0, 1}));
   std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch;
   ::arrow::TableBatchReader table_reader(*table);
   table_reader.set_chunksize(batch_size);
@@ -2401,7 +2402,7 @@ void TestGetRecordBatchReader(
   ASSERT_EQ(nullptr, actual_batch);
 
   // ARROW-6005: Read just the second row group
-  ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({1}, &rb_reader));
+  ASSERT_OK_AND_ASSIGN(rb_reader, reader->GetRecordBatchReader({1}));
   std::shared_ptr<Table> second_rowgroup = table->Slice(num_rows / 2);
   ::arrow::TableBatchReader second_table_reader(*second_rowgroup);
   second_table_reader.set_chunksize(batch_size);
@@ -2448,8 +2449,8 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
                                       ::arrow::io::CacheOptions::Defaults());
   ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3, 
4}).status());
 
-  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
-  ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}, 
&rb_reader));
+  ASSERT_OK_AND_ASSIGN(auto rb_reader,
+                       reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}));
 
   std::shared_ptr<::arrow::RecordBatch> actual_batch;
   ASSERT_OK(rb_reader->ReadNext(&actual_batch));
@@ -2507,8 +2508,8 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
   ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
   ASSERT_OK(builder.properties(properties)->Build(&reader));
 
-  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
-  ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {}, &rb_reader));
+  ASSERT_OK_AND_ASSIGN(auto rb_reader, 
reader->GetRecordBatchReader(std::vector<int>{0},
+                                                                    
std::vector<int>{}));
 
   std::shared_ptr<::arrow::RecordBatch> actual_batch;
   ASSERT_OK(rb_reader->ReadNext(&actual_batch));
@@ -4016,8 +4017,7 @@ TEST(TestArrowReaderAdHoc, 
LARGE_MEMORY_TEST(LargeStringColumn)) {
   // ARROW-9297: ensure RecordBatchReader also works
   reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
   ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader), 
&arrow_reader));
-  std::shared_ptr<::arrow::RecordBatchReader> batch_reader;
-  ASSERT_OK_NO_THROW(arrow_reader->GetRecordBatchReader(&batch_reader));
+  ASSERT_OK_AND_ASSIGN(auto batch_reader, 
arrow_reader->GetRecordBatchReader());
   ASSERT_OK_AND_ASSIGN(auto batched_table,
                        
::arrow::Table::FromRecordBatchReader(batch_reader.get()));
 
@@ -4491,9 +4491,8 @@ class TestArrowReadDictionary : public 
::testing::TestWithParam<double> {
   void CheckStreamReadWholeFile(const Table& expected) {
     ASSERT_OK_AND_ASSIGN(auto reader, GetReader());
 
-    std::unique_ptr<::arrow::RecordBatchReader> rb;
-    ASSERT_OK(reader->GetRecordBatchReader(
-        ::arrow::internal::Iota(options.num_row_groups), &rb));
+    ASSERT_OK_AND_ASSIGN(auto rb, reader->GetRecordBatchReader(
+                                      
::arrow::internal::Iota(options.num_row_groups)));
 
     ASSERT_OK_AND_ASSIGN(auto actual, rb->ToTable());
     ::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
@@ -4773,10 +4772,9 @@ TEST_F(TestArrowReadDeltaEncoding, 
IncrementalDecodeDeltaByteArray) {
   ArrowReaderProperties properties = default_arrow_reader_properties();
   properties.set_batch_size(batch_size);
   std::unique_ptr<FileReader> parquet_reader;
-  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
   ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), 
properties,
                              &parquet_reader));
-  ASSERT_OK(parquet_reader->GetRecordBatchReader(&rb_reader));
+  ASSERT_OK_AND_ASSIGN(auto rb_reader, parquet_reader->GetRecordBatchReader());
 
   auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
   std::vector<std::string> column_names = {
@@ -5262,9 +5260,8 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
   }
 
   // Verify batch data read via RecordBatch
-  std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
-  ASSERT_OK_NO_THROW(
-      arrow_reader->GetRecordBatchReader(Iota(num_row_groups), &batch_reader));
+  ASSERT_OK_AND_ASSIGN(auto batch_reader,
+                       
arrow_reader->GetRecordBatchReader(Iota(num_row_groups)));
   std::shared_ptr<::arrow::RecordBatch> read_record_batch;
   ASSERT_OK(batch_reader->ReadNext(&read_record_batch));
   EXPECT_TRUE(record_batch->Equals(*read_record_batch));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 3002d90b5f..465ad5844d 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -325,19 +325,19 @@ class FileReaderImpl : public FileReader {
     return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
   }
 
-  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                              const std::vector<int>& column_indices,
-                              std::unique_ptr<RecordBatchReader>* out) 
override;
+  Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
+      const std::vector<int>& row_group_indices,
+      const std::vector<int>& column_indices) override;
 
-  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                              std::unique_ptr<RecordBatchReader>* out) 
override {
+  Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
+      const std::vector<int>& row_group_indices) override {
     return GetRecordBatchReader(row_group_indices,
-                                Iota(reader_->metadata()->num_columns()), out);
+                                Iota(reader_->metadata()->num_columns()));
   }
 
-  Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) 
override {
+  Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader() override {
     return GetRecordBatchReader(Iota(num_row_groups()),
-                                Iota(reader_->metadata()->num_columns()), out);
+                                Iota(reader_->metadata()->num_columns()));
   }
 
   
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
@@ -972,9 +972,8 @@ Status GetReader(const SchemaField& field, const 
std::shared_ptr<ReaderContext>&
 
 }  // namespace
 
-Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
-                                            const std::vector<int>& 
column_indices,
-                                            
std::unique_ptr<RecordBatchReader>* out) {
+Result<std::unique_ptr<RecordBatchReader>> 
FileReaderImpl::GetRecordBatchReader(
+    const std::vector<int>& row_groups, const std::vector<int>& 
column_indices) {
   RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
 
   if (reader_properties_.pre_buffer()) {
@@ -1008,10 +1007,8 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
       }
     }
 
-    *out = std::make_unique<RowGroupRecordBatchReader>(
+    return std::make_unique<RowGroupRecordBatchReader>(
         ::arrow::MakeVectorIterator(std::move(batches)), 
std::move(batch_schema));
-
-    return Status::OK();
   }
 
   int64_t num_rows = 0;
@@ -1062,10 +1059,8 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
             [table, table_reader] { return table_reader->Next(); });
       });
 
-  *out = std::make_unique<RowGroupRecordBatchReader>(
+  return std::make_unique<RowGroupRecordBatchReader>(
       ::arrow::MakeFlattenIterator(std::move(batches)), 
std::move(batch_schema));
-
-  return Status::OK();
 }
 
 /// Given a file reader and a list of row groups, this is a generator of record
@@ -1291,17 +1286,33 @@ std::shared_ptr<RowGroupReader> 
FileReaderImpl::RowGroup(int row_group_index) {
 // ----------------------------------------------------------------------
 // Public factory functions
 
+Status FileReader::GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* 
out) {
+  ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader());
+  return Status::OK();
+}
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>& 
row_group_indices,
+                                        std::unique_ptr<RecordBatchReader>* 
out) {
+  ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices));
+  return Status::OK();
+}
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>& 
row_group_indices,
+                                        const std::vector<int>& column_indices,
+                                        std::unique_ptr<RecordBatchReader>* 
out) {
+  ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices, 
column_indices));
+  return Status::OK();
+}
+
 Status FileReader::GetRecordBatchReader(std::shared_ptr<RecordBatchReader>* 
out) {
-  std::unique_ptr<RecordBatchReader> tmp;
-  RETURN_NOT_OK(GetRecordBatchReader(&tmp));
+  ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader());
   out->reset(tmp.release());
   return Status::OK();
 }
 
 Status FileReader::GetRecordBatchReader(const std::vector<int>& 
row_group_indices,
                                         std::shared_ptr<RecordBatchReader>* 
out) {
-  std::unique_ptr<RecordBatchReader> tmp;
-  RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
+  ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader(row_group_indices));
   out->reset(tmp.release());
   return Status::OK();
 }
@@ -1309,8 +1320,8 @@ Status FileReader::GetRecordBatchReader(const 
std::vector<int>& row_group_indice
 Status FileReader::GetRecordBatchReader(const std::vector<int>& 
row_group_indices,
                                         const std::vector<int>& column_indices,
                                         std::shared_ptr<RecordBatchReader>* 
out) {
-  std::unique_ptr<RecordBatchReader> tmp;
-  RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
+  ARROW_ASSIGN_OR_RAISE(auto tmp,
+                        GetRecordBatchReader(row_group_indices, 
column_indices));
   out->reset(tmp.release());
   return Status::OK();
 }
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index ec996a5afa..476a940bf1 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -155,8 +155,14 @@ class PARQUET_EXPORT FileReader {
                                      std::shared_ptr<::arrow::ChunkedArray>* 
out) = 0;
 
   /// \brief Return a RecordBatchReader of all row groups and columns.
-  virtual ::arrow::Status GetRecordBatchReader(
-      std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+  ///
+  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
+  ::arrow::Status 
GetRecordBatchReader(std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+  /// \brief Return a RecordBatchReader of all row groups and columns.
+  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+  GetRecordBatchReader() = 0;
 
   /// \brief Return a RecordBatchReader of row groups selected from 
row_group_indices.
   ///
@@ -164,9 +170,21 @@ class PARQUET_EXPORT FileReader {
   /// their RecordBatchReaders.
   ///
   /// \returns error Status if row_group_indices contains an invalid index
+  ///
+  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
   virtual ::arrow::Status GetRecordBatchReader(
       const std::vector<int>& row_group_indices,
-      std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+      std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+  /// \brief Return a RecordBatchReader of row groups selected from 
row_group_indices.
+  ///
+  /// Note that the ordering in row_group_indices matters. FileReaders must 
outlive
+  /// their RecordBatchReaders.
+  ///
+  /// \returns error Result if row_group_indices contains an invalid index
+  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+  GetRecordBatchReader(const std::vector<int>& row_group_indices) = 0;
 
   /// \brief Return a RecordBatchReader of row groups selected from
   /// row_group_indices, whose columns are selected by column_indices.
@@ -176,9 +194,24 @@ class PARQUET_EXPORT FileReader {
   ///
   /// \returns error Status if either row_group_indices or column_indices
   ///     contains an invalid index
+  ///
+  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
   virtual ::arrow::Status GetRecordBatchReader(
       const std::vector<int>& row_group_indices, const std::vector<int>& 
column_indices,
-      std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+      std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+  /// \brief Return a RecordBatchReader of row groups selected from
+  /// row_group_indices, whose columns are selected by column_indices.
+  ///
+  /// Note that the ordering in row_group_indices and column_indices
+  /// matter. FileReaders must outlive their RecordBatchReaders.
+  ///
+  /// \returns error Result if either row_group_indices or column_indices
+  ///     contains an invalid index
+  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+  GetRecordBatchReader(const std::vector<int>& row_group_indices,
+                       const std::vector<int>& column_indices) = 0;
 
   /// \brief Return a RecordBatchReader of row groups selected from
   /// row_group_indices, whose columns are selected by column_indices.

Reply via email to