This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0cdbdac413 GH-44808: [C++][Parquet] Add `arrow::Result` version of
`parquet::arrow::FileReader::GetRecordBatchReader()` (#44809)
0cdbdac413 is described below
commit 0cdbdac413fa1e7acaf0362eb8ca8fd1911d2a9e
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sat Nov 23 07:11:04 2024 +0900
GH-44808: [C++][Parquet] Add `arrow::Result` version of
`parquet::arrow::FileReader::GetRecordBatchReader()` (#44809)
### Rationale for this change
We're migrating `arrow::Status` + output variable API to `arrow::Result`
API.
### What changes are included in this PR?
* Add `arrow::Result<std::unique_ptr<arrow::RecordBatchReader
parquet::arrow::FileReader::GetRecordBatchReader()`
* Deprecate `arrow::Status
parquet::arrow::FileReadeder::GetRecordBatchReader()`
* Use the added `arrow::Result` version in our code base
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* GitHub Issue: #44808
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 41 ++++++++--------
cpp/src/parquet/arrow/reader.cc | 57 ++++++++++++++---------
cpp/src/parquet/arrow/reader.h | 41 ++++++++++++++--
3 files changed, 90 insertions(+), 49 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index b5fd2bc255..78d272ff24 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -479,14 +479,16 @@ void DoRoundTripWithBatches(
->Build(&reader));
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
if (column_subset.size() > 0) {
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
- Iota(reader->parquet_reader()->metadata()->num_row_groups()),
column_subset,
- &batch_reader));
+ ASSERT_OK_AND_ASSIGN(
+ batch_reader,
+ reader->GetRecordBatchReader(
+ Iota(reader->parquet_reader()->metadata()->num_row_groups()),
column_subset));
} else {
// Read everything
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
- Iota(reader->parquet_reader()->metadata()->num_row_groups()),
&batch_reader));
+ ASSERT_OK_AND_ASSIGN(
+ batch_reader, reader->GetRecordBatchReader(
+
Iota(reader->parquet_reader()->metadata()->num_row_groups())));
}
ASSERT_OK_AND_ASSIGN(*out, Table::FromRecordBatchReader(batch_reader.get()));
}
@@ -2385,8 +2387,7 @@ void TestGetRecordBatchReader(
ASSERT_OK(builder.properties(properties)->Build(&reader));
// Read the whole file, one batch at a time.
- std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
+ ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader({0, 1}));
std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch;
::arrow::TableBatchReader table_reader(*table);
table_reader.set_chunksize(batch_size);
@@ -2401,7 +2402,7 @@ void TestGetRecordBatchReader(
ASSERT_EQ(nullptr, actual_batch);
// ARROW-6005: Read just the second row group
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({1}, &rb_reader));
+ ASSERT_OK_AND_ASSIGN(rb_reader, reader->GetRecordBatchReader({1}));
std::shared_ptr<Table> second_rowgroup = table->Slice(num_rows / 2);
::arrow::TableBatchReader second_table_reader(*second_rowgroup);
second_table_reader.set_chunksize(batch_size);
@@ -2448,8 +2449,8 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
::arrow::io::CacheOptions::Defaults());
ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3,
4}).status());
- std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4},
&rb_reader));
+ ASSERT_OK_AND_ASSIGN(auto rb_reader,
+ reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}));
std::shared_ptr<::arrow::RecordBatch> actual_batch;
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
@@ -2507,8 +2508,8 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));
- std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
- ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {}, &rb_reader));
+ ASSERT_OK_AND_ASSIGN(auto rb_reader,
reader->GetRecordBatchReader(std::vector<int>{0},
+
std::vector<int>{}));
std::shared_ptr<::arrow::RecordBatch> actual_batch;
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
@@ -4016,8 +4017,7 @@ TEST(TestArrowReaderAdHoc,
LARGE_MEMORY_TEST(LargeStringColumn)) {
// ARROW-9297: ensure RecordBatchReader also works
reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader),
&arrow_reader));
- std::shared_ptr<::arrow::RecordBatchReader> batch_reader;
- ASSERT_OK_NO_THROW(arrow_reader->GetRecordBatchReader(&batch_reader));
+ ASSERT_OK_AND_ASSIGN(auto batch_reader,
arrow_reader->GetRecordBatchReader());
ASSERT_OK_AND_ASSIGN(auto batched_table,
::arrow::Table::FromRecordBatchReader(batch_reader.get()));
@@ -4491,9 +4491,8 @@ class TestArrowReadDictionary : public
::testing::TestWithParam<double> {
void CheckStreamReadWholeFile(const Table& expected) {
ASSERT_OK_AND_ASSIGN(auto reader, GetReader());
- std::unique_ptr<::arrow::RecordBatchReader> rb;
- ASSERT_OK(reader->GetRecordBatchReader(
- ::arrow::internal::Iota(options.num_row_groups), &rb));
+ ASSERT_OK_AND_ASSIGN(auto rb, reader->GetRecordBatchReader(
+
::arrow::internal::Iota(options.num_row_groups)));
ASSERT_OK_AND_ASSIGN(auto actual, rb->ToTable());
::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
@@ -4773,10 +4772,9 @@ TEST_F(TestArrowReadDeltaEncoding,
IncrementalDecodeDeltaByteArray) {
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_batch_size(batch_size);
std::unique_ptr<FileReader> parquet_reader;
- std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
properties,
&parquet_reader));
- ASSERT_OK(parquet_reader->GetRecordBatchReader(&rb_reader));
+ ASSERT_OK_AND_ASSIGN(auto rb_reader, parquet_reader->GetRecordBatchReader());
auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
std::vector<std::string> column_names = {
@@ -5262,9 +5260,8 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
}
// Verify batch data read via RecordBatch
- std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
- ASSERT_OK_NO_THROW(
- arrow_reader->GetRecordBatchReader(Iota(num_row_groups), &batch_reader));
+ ASSERT_OK_AND_ASSIGN(auto batch_reader,
+
arrow_reader->GetRecordBatchReader(Iota(num_row_groups)));
std::shared_ptr<::arrow::RecordBatch> read_record_batch;
ASSERT_OK(batch_reader->ReadNext(&read_record_batch));
EXPECT_TRUE(record_batch->Equals(*read_record_batch));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 3002d90b5f..465ad5844d 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -325,19 +325,19 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}
- Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
- const std::vector<int>& column_indices,
- std::unique_ptr<RecordBatchReader>* out)
override;
+ Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
+ const std::vector<int>& row_group_indices,
+ const std::vector<int>& column_indices) override;
- Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
- std::unique_ptr<RecordBatchReader>* out)
override {
+ Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader(
+ const std::vector<int>& row_group_indices) override {
return GetRecordBatchReader(row_group_indices,
- Iota(reader_->metadata()->num_columns()), out);
+ Iota(reader_->metadata()->num_columns()));
}
- Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out)
override {
+ Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader() override {
return GetRecordBatchReader(Iota(num_row_groups()),
- Iota(reader_->metadata()->num_columns()), out);
+ Iota(reader_->metadata()->num_columns()));
}
::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
@@ -972,9 +972,8 @@ Status GetReader(const SchemaField& field, const
std::shared_ptr<ReaderContext>&
} // namespace
-Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
- const std::vector<int>&
column_indices,
-
std::unique_ptr<RecordBatchReader>* out) {
+Result<std::unique_ptr<RecordBatchReader>>
FileReaderImpl::GetRecordBatchReader(
+ const std::vector<int>& row_groups, const std::vector<int>&
column_indices) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
if (reader_properties_.pre_buffer()) {
@@ -1008,10 +1007,8 @@ Status FileReaderImpl::GetRecordBatchReader(const
std::vector<int>& row_groups,
}
}
- *out = std::make_unique<RowGroupRecordBatchReader>(
+ return std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)),
std::move(batch_schema));
-
- return Status::OK();
}
int64_t num_rows = 0;
@@ -1062,10 +1059,8 @@ Status FileReaderImpl::GetRecordBatchReader(const
std::vector<int>& row_groups,
[table, table_reader] { return table_reader->Next(); });
});
- *out = std::make_unique<RowGroupRecordBatchReader>(
+ return std::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(batches)),
std::move(batch_schema));
-
- return Status::OK();
}
/// Given a file reader and a list of row groups, this is a generator of record
@@ -1291,17 +1286,33 @@ std::shared_ptr<RowGroupReader>
FileReaderImpl::RowGroup(int row_group_index) {
// ----------------------------------------------------------------------
// Public factory functions
+Status FileReader::GetRecordBatchReader(std::unique_ptr<RecordBatchReader>*
out) {
+ ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader());
+ return Status::OK();
+}
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>&
row_group_indices,
+ std::unique_ptr<RecordBatchReader>*
out) {
+ ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices));
+ return Status::OK();
+}
+
+Status FileReader::GetRecordBatchReader(const std::vector<int>&
row_group_indices,
+ const std::vector<int>& column_indices,
+ std::unique_ptr<RecordBatchReader>*
out) {
+ ARROW_ASSIGN_OR_RAISE(*out, GetRecordBatchReader(row_group_indices,
column_indices));
+ return Status::OK();
+}
+
Status FileReader::GetRecordBatchReader(std::shared_ptr<RecordBatchReader>*
out) {
- std::unique_ptr<RecordBatchReader> tmp;
- RETURN_NOT_OK(GetRecordBatchReader(&tmp));
+ ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader());
out->reset(tmp.release());
return Status::OK();
}
Status FileReader::GetRecordBatchReader(const std::vector<int>&
row_group_indices,
std::shared_ptr<RecordBatchReader>*
out) {
- std::unique_ptr<RecordBatchReader> tmp;
- RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, &tmp));
+ ARROW_ASSIGN_OR_RAISE(auto tmp, GetRecordBatchReader(row_group_indices));
out->reset(tmp.release());
return Status::OK();
}
@@ -1309,8 +1320,8 @@ Status FileReader::GetRecordBatchReader(const
std::vector<int>& row_group_indice
Status FileReader::GetRecordBatchReader(const std::vector<int>&
row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<RecordBatchReader>*
out) {
- std::unique_ptr<RecordBatchReader> tmp;
- RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices, &tmp));
+ ARROW_ASSIGN_OR_RAISE(auto tmp,
+ GetRecordBatchReader(row_group_indices,
column_indices));
out->reset(tmp.release());
return Status::OK();
}
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index ec996a5afa..476a940bf1 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -155,8 +155,14 @@ class PARQUET_EXPORT FileReader {
std::shared_ptr<::arrow::ChunkedArray>*
out) = 0;
/// \brief Return a RecordBatchReader of all row groups and columns.
- virtual ::arrow::Status GetRecordBatchReader(
- std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+ ///
+ /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+ ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
+ ::arrow::Status
GetRecordBatchReader(std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+ /// \brief Return a RecordBatchReader of all row groups and columns.
+ virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+ GetRecordBatchReader() = 0;
/// \brief Return a RecordBatchReader of row groups selected from
row_group_indices.
///
@@ -164,9 +170,21 @@ class PARQUET_EXPORT FileReader {
/// their RecordBatchReaders.
///
/// \returns error Status if row_group_indices contains an invalid index
+ ///
+ /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+ ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices,
- std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+ std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+ /// \brief Return a RecordBatchReader of row groups selected from
row_group_indices.
+ ///
+ /// Note that the ordering in row_group_indices matters. FileReaders must
outlive
+ /// their RecordBatchReaders.
+ ///
+ /// \returns error Result if row_group_indices contains an invalid index
+ virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+ GetRecordBatchReader(const std::vector<int>& row_group_indices) = 0;
/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
@@ -176,9 +194,24 @@ class PARQUET_EXPORT FileReader {
///
/// \returns error Status if either row_group_indices or column_indices
/// contains an invalid index
+ ///
+ /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
+ ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>&
column_indices,
- std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+ std::unique_ptr<::arrow::RecordBatchReader>* out);
+
+ /// \brief Return a RecordBatchReader of row groups selected from
+ /// row_group_indices, whose columns are selected by column_indices.
+ ///
+ /// Note that the ordering in row_group_indices and column_indices
+ /// matter. FileReaders must outlive their RecordBatchReaders.
+ ///
+ /// \returns error Result if either row_group_indices or column_indices
+ /// contains an invalid index
+ virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
+ GetRecordBatchReader(const std::vector<int>& row_group_indices,
+ const std::vector<int>& column_indices) = 0;
/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.