mapleFU commented on code in PR #35889:
URL: https://github.com/apache/arrow/pull/35889#discussion_r1268868466
##########
cpp/src/parquet/arrow/reader.cc:
##########
@@ -1243,6 +1285,124 @@ Status FileReaderImpl::ReadRowGroups(const
std::vector<int>& row_groups,
return Status::OK();
}
+struct AsyncBatchGeneratorState {
+ ::arrow::internal::Executor* io_executor;
+ ::arrow::internal::Executor* cpu_executor;
+ std::vector<std::shared_ptr<ColumnReaderImpl>> column_readers;
+ std::queue<std::shared_ptr<RecordBatch>> overflow;
+ std::shared_ptr<::arrow::Schema> schema;
+ int64_t batch_size;
+ int64_t rows_remaining;
+ bool use_threads;
+ bool allow_sliced_batches;
+};
+
+class AsyncBatchGeneratorImpl {
+ public:
+ explicit AsyncBatchGeneratorImpl(std::shared_ptr<AsyncBatchGeneratorState>
state)
+ : state_(std::move(state)) {}
+ Future<std::shared_ptr<RecordBatch>> operator()() {
+ if (!state_->overflow.empty()) {
+ std::shared_ptr<RecordBatch> next = std::move(state_->overflow.front());
+ state_->overflow.pop();
+ return next;
+ }
+
+ if (state_->rows_remaining == 0) {
+ // Exhausted
+ return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
+ ::arrow::IterationEnd<std::shared_ptr<RecordBatch>>());
+ }
+
+ int64_t rows_in_batch = std::min(state_->rows_remaining,
state_->batch_size);
+ state_->rows_remaining -= rows_in_batch;
+
+ // We read the columns in parallel. Each reader returns a chunked array.
This is
+ // probably because we might need to chunk a column if that column is too
large. We
+ // do provide a batch size but perhaps that column has massive strings or
something
+ // like that.
+ Future<std::vector<std::shared_ptr<ChunkedArray>>> chunked_arrays_fut =
+ ::arrow::internal::OptionalParallelForAsync(
+ state_->use_threads, state_->column_readers,
+ [rows_in_batch](std::size_t, std::shared_ptr<ColumnReaderImpl>
column_reader)
+ -> Result<std::shared_ptr<ChunkedArray>> {
+ std::shared_ptr<ChunkedArray> chunked_array;
+ ARROW_RETURN_NOT_OK(
+ column_reader->NextBatch(rows_in_batch, &chunked_array));
+ return chunked_array;
+ });
+
+ // Grab the first batch of data and return it. If there is more than one
batch then
+ // throw the reamining batches into overflow and they will be fetched on
the next call
+ return chunked_arrays_fut.Then(
+ [state = state_,
+ rows_in_batch](const std::vector<std::shared_ptr<ChunkedArray>>&
chunks)
+ -> Result<std::shared_ptr<RecordBatch>> {
+ std::shared_ptr<Table> table =
+ Table::Make(state->schema, chunks, rows_in_batch);
+ ::arrow::TableBatchReader batch_reader(*table);
+ std::shared_ptr<RecordBatch> first;
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> next_batch,
+ batch_reader.Next());
+ if (!next_batch) {
+ break;
+ }
+ if (first) {
+ if (!state->allow_sliced_batches) {
+ return Status::Invalid(
+ "The setting allow_sliced_batches is set to false and data
was "
+ "encountered that was too large to fit in a single
batch.");
+ }
+ state->overflow.push(std::move(next_batch));
+ } else {
+ first = std::move(next_batch);
+ }
+ }
+ if (!first) {
+ // TODO(weston): Test this case
+ return Status::Invalid("Unexpected empty row group");
Review Comment:
```c++
Status WriteTable(const Table& table, int64_t chunk_size) override {
RETURN_NOT_OK(table.Validate());
if (chunk_size <= 0 && table.num_rows() > 0) {
return Status::Invalid("chunk size per row_group must be greater than
0");
} else if (!table.schema()->Equals(*schema_, false)) {
return Status::Invalid("table schema does not match this writer's.
table:'",
table.schema()->ToString(), "' this:'",
schema_->ToString(),
"'");
} else if (chunk_size > this->properties().max_row_group_length()) {
chunk_size = this->properties().max_row_group_length();
}
auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
}
return Status::OK();
};
if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0),
PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
}
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() -
offset)),
PARQUET_IGNORE_NOT_OK(Close()));
}
return Status::OK();
}
```
It's from this code. It's easy to flush the rowgroup that row_num == 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]