westonpace commented on a change in pull request #11207:
URL: https://github.com/apache/arrow/pull/11207#discussion_r714286753
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -221,6 +221,9 @@ class MappingGenerator {
bool should_trigger;
{
auto guard = state->mutex.Lock();
+ // A MappedCallback may have purged or be purging the queue;
+ // we shouldn't do anything here.
+ if (state->finished) return;
Review comment:
Shoot, I kind of forgot about this but I have this fix in my CSV
streaming PR I kind of half-abandoned because it was too complex and I needed
to work on other things. Can you grab the QueuingMapFailStress test case from
https://github.com/apache/arrow/pull/10795/files#diff-aa924daf1e09b13ad031b70e40eeff3b4d4b817b2a32981bb0c0fd2e5313326cR500
I like your fix better anyways. Instead of returning early I just added
more flags and ifs which was a more complex version of the same thing.
##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -132,5 +133,28 @@ class FragmentDataset : public Dataset {
AsyncGenerator<std::shared_ptr<Fragment>> fragment_gen_;
};
+// Given a record batch generator, creates a new generator that slices
+// batches so individual batches have at most batch_size rows. The
+// resulting generator is async-reentrant, but does not forward
+// reentrant pulls, so apply readahead before using this helper.
+inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen,
+ int64_t batch_size) {
+ return MakeConcatenatedGenerator(MakeMappedGenerator(
+ std::move(gen),
+ [batch_size](const std::shared_ptr<RecordBatch>& batch)
+ -> ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> {
+ const int64_t rows = batch->num_rows();
+ if (rows <= batch_size) {
+ return
::arrow::MakeVectorGenerator<std::shared_ptr<RecordBatch>>({batch});
+ }
+ std::vector<std::shared_ptr<RecordBatch>> slices;
+ slices.reserve(rows / batch_size + (rows % batch_size != 0));
+ for (int64_t i = 0; i < rows; i += batch_size) {
+ slices.push_back(batch->Slice(i, batch_size));
+ }
+ return ::arrow::MakeVectorGenerator(std::move(slices));
Review comment:
I wonder if we want to create a simple helper function
MakeFlattenGenerator which just calls MakeConcatenated(MakeMapped) under the
hood. That way if we ever do decide the overhead is too much we can change the
implementation.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1001,32 +1004,45 @@ class MergedGenerator {
};
struct InnerCallback {
Review comment:
Ok, so let me make sure I understand the change. If a merged generator
pulled off a generator and that generator had a whole bunch of already
completed futures (e.g. vector generator) then we would end up recursing for
each call into the child.
Instead, you setup the TryAddCallback loop to only recurse when the future
isn't finished. I think this is a great addition. Even if we weren't
overflowing the stack in most cases we'd still be creating unnecessarily long
stack traces. Thanks for working through this.
##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -589,6 +589,23 @@ class FileFormatScanMixin : public
FileFormatFixtureMixin<FormatHelper>,
}
ASSERT_EQ(row_count, GetParam().expected_rows());
}
+ // Ensure batch_size is respected
+ void TestScanBatchSize() {
+ auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
+ auto source = this->GetFileSource(reader.get());
+
+ this->SetSchema(reader->schema()->fields());
+ auto fragment = this->MakeFragment(*source);
+
+ int64_t row_count = 0;
+ opts_->batch_size = 16;
Review comment:
I think all of our test cases have # rows evenly divisible by 16. Maybe
make this 15 or something odd so that we ensure we test cases where the read
doesn't split into even batches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]