westonpace commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r721792741
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1645,6 +1667,47 @@ AsyncGenerator<T> MakeCancellable(AsyncGenerator<T>
source, StopToken stop_token
return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
}
+template <typename T>
+struct PauseableGenerator {
+ public:
+ PauseableGenerator(AsyncGenerator<T> source,
std::shared_ptr<util::AsyncToggle> toggle)
+ : state_(std::make_shared<PauseableGeneratorState>(std::move(source),
+ std::move(toggle))) {}
+
+ Future<T> operator()() { return (*state_)(); }
+
+ private:
+ struct PauseableGeneratorState
+ : public std::enable_shared_from_this<PauseableGeneratorState> {
+ PauseableGeneratorState(AsyncGenerator<T> source,
+ std::shared_ptr<util::AsyncToggle> toggle)
+ : source_(std::move(source)), toggle_(std::move(toggle)) {}
+
+ Future<T> operator()() {
+ std::shared_ptr<PauseableGeneratorState> self = this->shared_from_this();
+ return toggle_->WhenOpen().Then([self] {
Review comment:
I cleaned it up a little bit which should eliminate most reordering but
I could not escape the case:
Thread A calls Open
Thread D calls WhenOpen
Thread B calls Close
Thread C calls Open
Thread D calls WhenOpen
It is possible for callbacks on the second call to WhenOpen to finish before
callbacks on the first call.
So what I did was go ahead and change the comments for MakePauseable to
state that it is not async reentrant. This should prevent any possible
reordering because the second call to WhenOpen will not happen until the future
returned from the first call to WhenOpen has completed. The only spot we are
using MakePauseable does not need async reentrancy so this should be ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]