lidavidm commented on a change in pull request #9808:
URL: https://github.com/apache/arrow/pull/9808#discussion_r603536350
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -947,65 +947,134 @@ AsyncGenerator<T> MakeIteratorGenerator(Iterator<T> it) {
template <typename T>
class BackgroundGenerator {
public:
- explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor)
- : io_executor_(io_executor) {
- task_ = Task{std::make_shared<Iterator<T>>(std::move(it)),
- std::make_shared<std::atomic<bool>>(false)};
- }
-
- ~BackgroundGenerator() {
- // The thread pool will be disposed of automatically. By default it will
not wait
- // so the background thread may outlive this object. That should be ok.
Any task
- // objects in the thread pool are copies of task_ and have their own
shared_ptr to
- // the iterator.
- }
+ explicit BackgroundGenerator(Iterator<T> it, internal::Executor*
io_executor, int max_q,
+ int q_restart)
+ : state_(std::make_shared<State>(io_executor, std::move(it), max_q,
q_restart)) {}
- ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundGenerator);
- ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundGenerator);
+ ~BackgroundGenerator() {}
Future<T> operator()() {
- auto submitted_future = io_executor_->Submit(task_);
- if (!submitted_future.ok()) {
- return Future<T>::MakeFinished(submitted_future.status());
+ auto guard = state_->mutex.Lock();
+ if (state_->queue.empty()) {
+ if (state_->finished) {
+ return AsyncGeneratorEnd<T>();
+ } else {
+ state_->waiting_future = Future<T>::Make();
+ }
+ } else {
+ auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
+ state_->queue.pop();
+ if (!state_->running &&
+ static_cast<int>(state_->queue.size()) <= state_->q_restart) {
+ state_->RestartTask(state_);
+ }
+ return next;
}
- return std::move(*submitted_future);
+ if (!state_->running) {
+ // This branch should only be needed to start the background thread on
the first
+ // call
+ state_->RestartTask(state_);
+ }
+ return state_->waiting_future;
}
protected:
- struct Task {
- Result<T> operator()() {
- if (*done_) {
- return IterationTraits<T>::End();
+ struct State {
+ State(internal::Executor* io_executor, Iterator<T> it, int max_q, int
q_restart)
+ : io_executor(io_executor),
+ it(std::move(it)),
+ running(false),
+ finished(false),
+ max_q(max_q),
+ q_restart(q_restart) {}
+
+ void RestartTask(std::shared_ptr<State> state) {
+ if (!finished) {
+ running = true;
+ io_executor->Spawn([state]() { Task()(std::move(state)); });
Review comment:
Spawn() returns a Status so we should check it
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -947,65 +947,134 @@ AsyncGenerator<T> MakeIteratorGenerator(Iterator<T> it) {
template <typename T>
class BackgroundGenerator {
public:
- explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor)
- : io_executor_(io_executor) {
- task_ = Task{std::make_shared<Iterator<T>>(std::move(it)),
- std::make_shared<std::atomic<bool>>(false)};
- }
-
- ~BackgroundGenerator() {
- // The thread pool will be disposed of automatically. By default it will
not wait
- // so the background thread may outlive this object. That should be ok.
Any task
- // objects in the thread pool are copies of task_ and have their own
shared_ptr to
- // the iterator.
- }
+ explicit BackgroundGenerator(Iterator<T> it, internal::Executor*
io_executor, int max_q,
+ int q_restart)
+ : state_(std::make_shared<State>(io_executor, std::move(it), max_q,
q_restart)) {}
- ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundGenerator);
- ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundGenerator);
+ ~BackgroundGenerator() {}
Future<T> operator()() {
- auto submitted_future = io_executor_->Submit(task_);
- if (!submitted_future.ok()) {
- return Future<T>::MakeFinished(submitted_future.status());
+ auto guard = state_->mutex.Lock();
+ if (state_->queue.empty()) {
+ if (state_->finished) {
+ return AsyncGeneratorEnd<T>();
+ } else {
+ state_->waiting_future = Future<T>::Make();
+ }
+ } else {
+ auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
+ state_->queue.pop();
+ if (!state_->running &&
+ static_cast<int>(state_->queue.size()) <= state_->q_restart) {
+ state_->RestartTask(state_);
+ }
+ return next;
}
- return std::move(*submitted_future);
+ if (!state_->running) {
+ // This branch should only be needed to start the background thread on
the first
+ // call
+ state_->RestartTask(state_);
+ }
Review comment:
You could consolidate the above branch into this one, since on the first
call, the queue size should be <= q_restart
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]