lidavidm commented on a change in pull request #11207:
URL: https://github.com/apache/arrow/pull/11207#discussion_r714326412
##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -132,5 +133,28 @@ class FragmentDataset : public Dataset {
AsyncGenerator<std::shared_ptr<Fragment>> fragment_gen_;
};
+// Given a record batch generator, creates a new generator that slices
+// batches so individual batches have at most batch_size rows. The
+// resulting generator is async-reentrant, but does not forward
+// reentrant pulls, so apply readahead before using this helper.
+inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen,
+ int64_t batch_size) {
+ return MakeConcatenatedGenerator(MakeMappedGenerator(
+ std::move(gen),
+ [batch_size](const std::shared_ptr<RecordBatch>& batch)
+ -> ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> {
+ const int64_t rows = batch->num_rows();
+ if (rows <= batch_size) {
+ return
::arrow::MakeVectorGenerator<std::shared_ptr<RecordBatch>>({batch});
+ }
+ std::vector<std::shared_ptr<RecordBatch>> slices;
+ slices.reserve(rows / batch_size + (rows % batch_size != 0));
+ for (int64_t i = 0; i < rows; i += batch_size) {
+ slices.push_back(batch->Slice(i, batch_size));
+ }
+ return ::arrow::MakeVectorGenerator(std::move(slices));
Review comment:
Makes sense. This is basically flatmap so I've named it accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]