pitrou commented on code in PR #36779:
URL: https://github.com/apache/arrow/pull/36779#discussion_r1293707887
##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out)
= 0;
+ using AsyncBatchGenerator =
+ std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+ /// \brief Read a single row group from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ ///
+ /// \param i the index of the row group to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
Review Comment:
Is it necessary to expose `ReadRowGroupAsync` in addition to
`ReadRowGroupsAsync`? One is a trivial call to the other...
##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out)
= 0;
+ using AsyncBatchGenerator =
+ std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+ /// \brief Read a single row group from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ ///
+ /// \param i the index of the row group to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+ ::arrow::internal::Executor*
cpu_executor,
+ bool allow_sliced_batches =
false) = 0;
+ /// \brief Read some columns from a single row group from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ /// \see ReadTable for details on how column indices are resolved
+ ///
+ /// \param i the index of the row group to read
+ /// \param column_indices leaf-indices of the columns to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+ const std::vector<int>&
column_indices,
+ ::arrow::internal::Executor*
cpu_executor,
+ bool allow_sliced_batches =
false) = 0;
+
+ /// \brief Read row groups from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ ///
+ /// \param row_groups indices of the row groups to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
Review Comment:
I don't understand what "a batch has too much data for the given batch size"
means exactly. Do you mean "a row group has too much data for the given batch
size"?
Also, why is it false by default? It seems allowing it should be the default
behaviour.
##########
cpp/src/parquet/arrow/reader.h:
##########
@@ -249,6 +249,90 @@ class PARQUET_EXPORT FileReader {
virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out)
= 0;
+ using AsyncBatchGenerator =
+ std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>;
+
+ /// \brief Read a single row group from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ ///
+ /// \param i the index of the row group to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+ ::arrow::internal::Executor*
cpu_executor,
+ bool allow_sliced_batches =
false) = 0;
+ /// \brief Read some columns from a single row group from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ /// \see ReadTable for details on how column indices are resolved
+ ///
+ /// \param i the index of the row group to read
+ /// \param column_indices leaf-indices of the columns to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupAsync(int i,
+ const std::vector<int>&
column_indices,
+ ::arrow::internal::Executor*
cpu_executor,
+ bool allow_sliced_batches =
false) = 0;
+
+ /// \brief Read row groups from the file
+ ///
+ /// \see ReadRowGroupsAsync for operation details
+ ///
+ /// \param row_groups indices of the row groups to read
+ /// \param cpu_executor an executor to use to run CPU tasks
+ /// \param allow_sliced_batches if false, an error is raised if a batch has
too much
+ /// data for the given batch size. If true,
smaller
+ /// batches will be returned instead.
+ virtual AsyncBatchGenerator ReadRowGroupsAsync(
+ const std::vector<int>& row_groups, ::arrow::internal::Executor*
cpu_executor,
+ bool allow_sliced_batches = false) = 0;
+
+ /// \brief Read some columns from the given rows groups from the file
+ ///
+ /// If pre-buffering is enabled then all of the data will be read using the
pre-buffer
+ /// cache. See ParquetFileReader::PreBuffer for details on how this affects
memory and
+ /// performance.
+ ///
+ /// This operation is not perfectly async. The read from disk will be done
on an I/O
+ /// thread, which is correct. However, compression and column decoding is
also done on
+ /// the I/O thread which may not be ideal. The stage after that
(transferring the
+ /// decoded data into Arrow structures and fulfilling the future) should be
done as a
+ /// new task on the cpu_executor.
+ ///
+ /// The returned generator will respect the batch size set in the
ArrowReaderProperties.
+ /// Batches will not be larger than the given batch size. However, batches
may be
+ /// smaller. This can happen, for example, when there is not enough data or
when a
+ /// string column is too large to fit into a single batch. The parameter
+ /// `allow_sliced_batches` can be set to false to disallow this later case.
This can be
+ /// useful when you need to know exactly how many batches you will get from
the
+ /// operation before you start.
+ ///
+ /// Note: When reading multiple row groups there is no guarantee you will
get one
+ /// record batch per row group. Data from multiple row groups could get
combined into
+ /// a single batch.
Review Comment:
Interesting, and I agree it's probably desirable. Is it a deviation from
other APIs?
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1233,6 +1274,124 @@ Status FileReaderImpl::ReadRowGroups(const
std::vector<int>& row_groups,
return Status::OK();
}
+struct AsyncBatchGeneratorState {
+ ::arrow::internal::Executor* io_executor;
+ ::arrow::internal::Executor* cpu_executor;
+ std::vector<std::shared_ptr<ColumnReaderImpl>> column_readers;
+ std::queue<std::shared_ptr<RecordBatch>> overflow;
+ std::shared_ptr<::arrow::Schema> schema;
+ int64_t batch_size;
+ int64_t rows_remaining;
+ bool use_threads;
+ bool allow_sliced_batches;
+};
+
+class AsyncBatchGeneratorImpl {
+ public:
+ explicit AsyncBatchGeneratorImpl(std::shared_ptr<AsyncBatchGeneratorState>
state)
+ : state_(std::move(state)) {}
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ if (!state_->overflow.empty()) {
+ std::shared_ptr<RecordBatch> next = std::move(state_->overflow.front());
+ state_->overflow.pop();
+ return next;
+ }
+
+ if (state_->rows_remaining == 0) {
+ // Exhausted
+ return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
+ ::arrow::IterationEnd<std::shared_ptr<RecordBatch>>());
+ }
+
+ int64_t rows_in_batch = std::min(state_->rows_remaining,
state_->batch_size);
+ state_->rows_remaining -= rows_in_batch;
+
+ // We read the columns in parallel. Each reader returns a chunked array.
This is
+ // because we might need to chunk a column if that column is too large. We
+ // do provide a batch size but even for a small batch size it is possible
that a
+ // column has extremely large strings which don't fit in a single batch.
+ Future<std::vector<std::shared_ptr<ChunkedArray>>> chunked_arrays_fut =
+ ::arrow::internal::OptionalParallelForAsync(
+ state_->use_threads, state_->column_readers,
+ [rows_in_batch](std::size_t, std::shared_ptr<ColumnReaderImpl>
column_reader)
+ -> Result<std::shared_ptr<ChunkedArray>> {
+ std::shared_ptr<ChunkedArray> chunked_array;
+ ARROW_RETURN_NOT_OK(
+ column_reader->NextBatch(rows_in_batch, &chunked_array));
+ return chunked_array;
Review Comment:
Not necessary for this PR, but we'd probably like a `Result` returning
variant of `NextBatch`.
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1233,6 +1274,124 @@ Status FileReaderImpl::ReadRowGroups(const
std::vector<int>& row_groups,
return Status::OK();
}
+struct AsyncBatchGeneratorState {
+ ::arrow::internal::Executor* io_executor;
+ ::arrow::internal::Executor* cpu_executor;
+ std::vector<std::shared_ptr<ColumnReaderImpl>> column_readers;
+ std::queue<std::shared_ptr<RecordBatch>> overflow;
+ std::shared_ptr<::arrow::Schema> schema;
+ int64_t batch_size;
+ int64_t rows_remaining;
+ bool use_threads;
+ bool allow_sliced_batches;
+};
+
+class AsyncBatchGeneratorImpl {
+ public:
+ explicit AsyncBatchGeneratorImpl(std::shared_ptr<AsyncBatchGeneratorState>
state)
+ : state_(std::move(state)) {}
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ if (!state_->overflow.empty()) {
+ std::shared_ptr<RecordBatch> next = std::move(state_->overflow.front());
+ state_->overflow.pop();
+ return next;
+ }
+
+ if (state_->rows_remaining == 0) {
+ // Exhausted
+ return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
+ ::arrow::IterationEnd<std::shared_ptr<RecordBatch>>());
+ }
+
+ int64_t rows_in_batch = std::min(state_->rows_remaining,
state_->batch_size);
+ state_->rows_remaining -= rows_in_batch;
+
+ // We read the columns in parallel. Each reader returns a chunked array.
This is
+ // because we might need to chunk a column if that column is too large. We
+ // do provide a batch size but even for a small batch size it is possible
that a
+ // column has extremely large strings which don't fit in a single batch.
+ Future<std::vector<std::shared_ptr<ChunkedArray>>> chunked_arrays_fut =
+ ::arrow::internal::OptionalParallelForAsync(
+ state_->use_threads, state_->column_readers,
+ [rows_in_batch](std::size_t, std::shared_ptr<ColumnReaderImpl>
column_reader)
+ -> Result<std::shared_ptr<ChunkedArray>> {
+ std::shared_ptr<ChunkedArray> chunked_array;
+ ARROW_RETURN_NOT_OK(
+ column_reader->NextBatch(rows_in_batch, &chunked_array));
+ return chunked_array;
+ },
+ state_->cpu_executor);
+
+ // Grab the first batch of data and return it. If there is more than one
batch then
+ // throw the reamining batches into overflow and they will be fetched on
the next call
+ return chunked_arrays_fut.Then(
+ [state = state_,
+ rows_in_batch](const std::vector<std::shared_ptr<ChunkedArray>>&
chunks)
+ -> Result<std::shared_ptr<RecordBatch>> {
+ std::shared_ptr<Table> table =
+ Table::Make(state->schema, chunks, rows_in_batch);
+ ::arrow::TableBatchReader batch_reader(*table);
+ std::shared_ptr<RecordBatch> first;
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> next_batch,
+ batch_reader.Next());
+ if (!next_batch) {
+ break;
+ }
+ if (first) {
+ if (!state->allow_sliced_batches) {
+ return Status::Invalid(
+ "The setting allow_sliced_batches is set to false and data
was "
+ "encountered that was too large to fit in a single
batch.");
+ }
+ state->overflow.push(std::move(next_batch));
Review Comment:
I suppose this makes the generator not async-reentrant, since `operator()`
might be called from one thread while this callback runs on another thread?
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -3963,6 +4096,88 @@ TEST(TestArrowReaderAdHoc,
LARGE_MEMORY_TEST(LargeStringColumn)) {
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}
+TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringValue)) {
+ // ARROW-3762
+ ::arrow::StringBuilder builder;
+ // 16 rows of 256MiB bytes each is 4GiB. This will get put into
+ // 3 chunks of 7 rows, 7 rows, and 2 rows.
+ constexpr std::int32_t kValueSize = 1 << 28;
+ constexpr std::int32_t kNumRows = 4;
+ constexpr std::int32_t kNumChunks = 4;
+ std::vector<std::shared_ptr<Array>> chunks;
+ std::vector<uint8_t> value(kValueSize, '0');
+ for (int chunk_idx = 0; chunk_idx < kNumChunks; chunk_idx++) {
+ ASSERT_OK(builder.Resize(kNumRows));
+ ASSERT_OK(builder.ReserveData(kNumRows * kValueSize));
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ builder.UnsafeAppend(value.data(), kValueSize);
+ }
+ std::shared_ptr<Array> array;
+ ASSERT_OK(builder.Finish(&array));
+ chunks.push_back(std::move(array));
+ }
+
+ // Eaglerly free up memory
+ value.clear();
Review Comment:
`clear` unfortunately leaves the vector capacity unchanged. Perhaps `value =
{}` would work...
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -3963,6 +4096,88 @@ TEST(TestArrowReaderAdHoc,
LARGE_MEMORY_TEST(LargeStringColumn)) {
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}
+TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringValue)) {
+ // ARROW-3762
+ ::arrow::StringBuilder builder;
+ // 16 rows of 256MiB bytes each is 4GiB. This will get put into
+ // 3 chunks of 7 rows, 7 rows, and 2 rows.
+ constexpr std::int32_t kValueSize = 1 << 28;
+ constexpr std::int32_t kNumRows = 4;
+ constexpr std::int32_t kNumChunks = 4;
+ std::vector<std::shared_ptr<Array>> chunks;
+ std::vector<uint8_t> value(kValueSize, '0');
+ for (int chunk_idx = 0; chunk_idx < kNumChunks; chunk_idx++) {
+ ASSERT_OK(builder.Resize(kNumRows));
+ ASSERT_OK(builder.ReserveData(kNumRows * kValueSize));
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ builder.UnsafeAppend(value.data(), kValueSize);
+ }
+ std::shared_ptr<Array> array;
+ ASSERT_OK(builder.Finish(&array));
Review Comment:
This seems to be building the same array for each chunk, am I mistaken? You
could simply build the array once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]