westonpace commented on code in PR #36779:
URL: https://github.com/apache/arrow/pull/36779#discussion_r1269585568
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -2514,6 +2517,137 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
}
}
+TEST(TestArrowReadWrite, ReadRowGroupsAsync) {
+ constexpr int kNumRows = 1024;
+ constexpr int kRowGroupSize = 512;
+ constexpr int kNumColumns = 2;
+
+ std::shared_ptr<Table> table;
+ ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(kNumColumns, kNumRows, 1, &table));
+
+ std::shared_ptr<Buffer> buffer;
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, kRowGroupSize,
+
default_arrow_writer_properties(), &buffer));
+
+ for (std::vector<int> row_groups : std::vector<std::vector<int>>{{}, {0},
{0, 1}}) {
+ ARROW_SCOPED_TRACE("# row_groups = ", row_groups.size());
+ int32_t expected_total_rows = static_cast<int32_t>(row_groups.size()) *
kRowGroupSize;
+
+ for (std::vector<int> columns : std::vector<std::vector<int>>{{}, {0}, {0,
1}}) {
+ ARROW_SCOPED_TRACE("# columns = ", columns.size());
+
+ for (int row_group_size : {128, 512, 1024, 2048}) {
+ ARROW_SCOPED_TRACE("row_group_size = ", row_group_size);
+
+ ArrowReaderProperties properties = default_arrow_reader_properties();
+ properties.set_batch_size(row_group_size);
+ std::unique_ptr<FileReader> unique_reader;
+ FileReaderBuilder builder;
+ ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
+ ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+ auto batch_generator = unique_reader->ReadRowGroupsAsync(
+ row_groups, columns, ::arrow::internal::GetCpuThreadPool());
+
+ int64_t num_expected_batches =
Review Comment:
In Acero we currently assign each batch an index. If we read several files
in parallel then it helps to know how many batches we will receive before we
start reading the file.
```
parallel for file in files:
metadata[file.id] = inspect(file)
index_offset = 0
for file in files:
starting_index[file.id] = metadata[file.id].num_rows / batch_size
parallel for file in files:
index = starting_index[file.id]
for batch in scan_file(file):
batch.index = index++
yield batch
```
If we don't know how many batches are in a file (ahead of time) then we have
to read the files serially which can be quite slow. In #35889 I introduce a
user option `allow_jumbo_values`. If true, then we read serially. If false
(the default), then we read in parallel and raise an error if we have to split
a batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]