westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643628821
##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
- void AddCallback(Callback callback) {
+ void CheckOptions(const CallbackOptions& opts) {
+ if (opts.should_schedule != ShouldSchedule::NEVER) {
+ DCHECK_NE(opts.executor, NULL)
+ << "An executor must be specified when adding a callback that might
schedule";
+ }
+ }
+
+ void AddCallback(Callback callback, CallbackOptions opts) {
+ CheckOptions(opts);
std::unique_lock<std::mutex> lock(mutex_);
+ CallbackRecord callback_record{std::move(callback), opts};
if (IsFutureFinished(state_)) {
lock.unlock();
- std::move(callback)();
+ RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
} else {
- callbacks_.push_back(std::move(callback));
+ callbacks_.push_back(std::move(callback_record));
}
}
- bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+ bool TryAddCallback(const std::function<Callback()>& callback_factory,
+ CallbackOptions opts) {
+ CheckOptions(opts);
std::unique_lock<std::mutex> lock(mutex_);
if (IsFutureFinished(state_)) {
return false;
} else {
- callbacks_.push_back(callback_factory());
+ callbacks_.push_back({callback_factory(), opts});
return true;
}
}
+ bool ShouldSchedule(const CallbackRecord& callback_record, bool
from_unfinished) {
+ switch (callback_record.options.should_schedule) {
+ case ShouldSchedule::NEVER:
+ return false;
+ case ShouldSchedule::ALWAYS:
+ return true;
+ case ShouldSchedule::IF_UNFINISHED:
+ return from_unfinished;
+ default:
+ DCHECK(false) << "Unrecognized ShouldSchedule option";
+ return false;
+ }
+ }
+
+ void RunOrScheduleCallback(CallbackRecord& callback_record, bool
from_unfinished) {
+ if (ShouldSchedule(callback_record, from_unfinished)) {
+ // Need to make a copy of this to keep it alive until the callback has a
chance
+ // to be scheduled.
+ struct CallbackTask {
+ void operator()() { std::move(callback)(*self); }
+
+ Callback callback;
+ std::shared_ptr<FutureImpl> self;
Review comment:
The weak pointer in the old implementation was used to prevent futures
from creating a circular reference on themselves (callback references future
which references callback).
Unfortunately, the weak pointer relied on the future remaining valid until
all callbacks had run. If all callbacks run synchronously this is easy
(whomever is calling `MarkFinished` must have a valid reference until all
callbacks finish). Once we start scheduling callbacks we run into the problem
where `MarkFinished` can return before some callbacks have run and then when
those callbacks get scheduled the future has been deleted.
This fix isn't just a change to a strong pointer though (that would
introduce the circular reference problem again). Instead of the callback
itself having a reference to the future I changed it so that the callback took
the FutureImpl in as an argument (note, this is the internal `FutureImpl`
callback and not the publicly exposed `Future` callback). This allowed me to
avoid the circular reference because the strong pointer is created when the
callback is being triggered and not when the callback is being added.
Also, the strong pointer is only created if it is a scheduled callback. Any
existing performance should remain the same since no strong pointer of
`shared_from_this` call is made.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]