pitrou commented on code in PR #43756:
URL: https://github.com/apache/arrow/pull/43756#discussion_r1722122074
##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -810,10 +810,29 @@ class ReadaheadGenerator {
Future<> final_future = Future<>::Make();
std::atomic<int> num_running{0};
std::atomic<bool> finished{false};
- std::queue<Future<T>> readahead_queue;
+ std::deque<Future<T>> readahead_queue;
+ };
+
+ struct Cleanup {
+ explicit Cleanup(State* state) : state(state) {}
+ ~Cleanup() {
+ // Lose our own consumer reference to the wrapped generator.
+ state->source_generator = {};
+ // If the generator was destroyed before finishing, wait for all running
+ // tasks to end. While it adds latency, it also reduces the risk of
+ // shenanigans happening at process shutdown (e.g. GH-43604).
+ for (const auto& fut : state->readahead_queue) {
+ fut.Wait();
Review Comment:
Unfortunately, this seems to hang when a MergedGenerator is wrapped in
ReadaheadGenerator and an error is propagated along.
(for example in
`test_dataset.py::test_checksum_write_dataset_read_dataset_to_table`)
@westonpace Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]