kou commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1275675227
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
Review Comment:
```suggestion
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ } else {
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
+ } else {
+ SleepABit();
+ auto spawn_status = executor->Spawn([this, end_time, executor,
future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock
waiter");
+ }
+ }
+ }
+ }
+
+ Future<> RunTaskFuture() {
+ num_running_++;
+ // post the unlock check as a separate task
+ // otherwise we'll never let anything else run
+ // so nothing can unlock us
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+ using DurationClock = std::chrono::steady_clock::duration;
+
+ auto start_time = Clock::now();
+ std::chrono::duration<double> secs_left =
+ std::chrono::duration<double>(timeout_seconds_);
+ std::chrono::time_point<Clock> end_time =
Review Comment:
Can we use `auto` here?
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ } else {
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
Review Comment:
```suggestion
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ } else {
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
+ } else {
+ SleepABit();
+ auto spawn_status = executor->Spawn([this, end_time, executor,
future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock
waiter");
+ }
+ }
+ }
+ }
+
+ Future<> RunTaskFuture() {
+ num_running_++;
+ // post the unlock check as a separate task
+ // otherwise we'll never let anything else run
+ // so nothing can unlock us
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+ using DurationClock = std::chrono::steady_clock::duration;
+
+ auto start_time = Clock::now();
+ std::chrono::duration<double> secs_left =
Review Comment:
```suggestion
auto secs_left =
```
##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,47 @@ class ConcreteFutureImpl : public FutureImpl {
}
void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+ auto last_processed_time = std::chrono::steady_clock::now();
+ while (true) {
+ if (IsFutureFinished(state_)) {
+ return;
+ }
+ if (arrow::internal::SerialExecutor::RunTasksOnAllExecutors() == false) {
+ auto this_time = std::chrono::steady_clock::now();
+ if (this_time - last_processed_time < std::chrono::seconds(10)) {
+ ARROW_LOG(WARNING) << "Waiting for future, but no executors have had
any tasks "
+ "pending for last 10 seconds";
+ last_processed_time = std::chrono::steady_clock::now();
+ }
+ }
+ }
+#endif
}
bool DoWait(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::duration<double>(seconds),
[this] { return IsFutureFinished(state_); });
return IsFutureFinished(state_);
+#else
+ auto start = std::chrono::steady_clock::now();
+ std::chrono::duration<double> fsec =
std::chrono::duration<double>(seconds);
Review Comment:
```suggestion
auto fsec = std::chrono::duration<double>(seconds);
```
##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
}
state_->current_thread = {};
}
+#else // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors() {
+ auto globalState = GetSerialExecutorGlobalState();
+ // if the previously called executor was deleted, ignore last_called_executor
+ if (globalState->last_called_executor != NULL &&
+ globalState->all_executors.count(globalState->last_called_executor) ==
0) {
+ globalState->last_called_executor = NULL;
+ }
+ bool run_task = true;
+ bool keep_going = true;
+ while (keep_going) {
+ run_task = false;
+ keep_going = false;
+ for (auto it = globalState->all_executors.begin();
+ it != globalState->all_executors.end(); ++it) {
+ if (globalState->last_called_executor != NULL) {
+ // always rerun loop if we have a last_called_executor, otherwise
+ // we may drop out before every executor has been checked
+ keep_going = true;
+ if
(globalState->all_executors.count(globalState->last_called_executor) == 0 ||
+ globalState->last_called_executor == *it) {
+ // found the last one (or it doesn't exist ih the set any more)
+ // now we can start running things
+ globalState->last_called_executor = NULL;
+ }
+ // skip until after we have seen the last executor we called
+ // so that we do things nicely in turn
+ continue;
+ }
+ SerialExecutor* exe = *it;
Review Comment:
```suggestion
auto exe = *it;
```
##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1530,6 +1547,16 @@ TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
// These are our in-flight tasks
return TestInt(0);
}));
+#else
+ // if threading is disabled, we can't call Task() as we do below because
it will
+ // never return and will block everything
Review Comment:
@joemarshall Could you check this?
##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
// should all pass
auto source = [&]() -> Future<TestInt> {
auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
gating_task->Task()();
if (count == 0) {
return Status::Invalid("X");
}
return TestInt(count);
}));
+#else
+ // if threading is disabled, we can't call Task() as we do below because
it will
+ // never return and will block everything
Review Comment:
@joemarshall Could you check this?
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
Review Comment:
Or:
```cpp
if (unlocked_) {
...
return;
}
if (std::chrono::steady_clock::now() > end_time) {
...
return;
}
SleepABit();
...
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ } else {
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
+ } else {
+ SleepABit();
+ auto spawn_status = executor->Spawn([this, end_time, executor,
future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock
waiter");
+ }
+ }
+ }
+ }
+
+ Future<> RunTaskFuture() {
+ num_running_++;
+ // post the unlock check as a separate task
+ // otherwise we'll never let anything else run
+ // so nothing can unlock us
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+ using DurationClock = std::chrono::steady_clock::duration;
+
+ auto start_time = Clock::now();
+ std::chrono::duration<double> secs_left =
+ std::chrono::duration<double>(timeout_seconds_);
+ std::chrono::time_point<Clock> end_time =
+ std::chrono::time_point_cast<DurationClock, Clock,
DurationDouble>(start_time +
+
secs_left);
+ arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool();
Review Comment:
```suggestion
auto executor = arrow::internal::GetCpuThreadPool();
```
##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public
std::enable_shared_from_this<GatingTask::Impl> {
return unlocked_future_;
}
+ void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock>
end_time,
+ arrow::internal::Executor* executor, Future<>
future) {
+ if (unlocked_) {
+ num_finished_++;
+ future.MarkFinished(Status::OK());
+ return;
+ } else {
+ if (std::chrono::steady_clock::now() > end_time) {
+ num_finished_++;
+ future.MarkFinished(
+ Status::Invalid("Task unlock never happened - if threads are
disabled you "
+ "can't wait on gatedtask"));
+ return;
+ } else {
+ SleepABit();
+ auto spawn_status = executor->Spawn([this, end_time, executor,
future]() {
+ WaitForEndOrUnlocked(end_time, executor, future);
+ });
+ if (!spawn_status.ok()) {
+ status_ &= Status::Invalid("Couldn't spawn gating task unlock
waiter");
+ }
+ }
+ }
+ }
+
+ Future<> RunTaskFuture() {
+ num_running_++;
+ // post the unlock check as a separate task
+ // otherwise we'll never let anything else run
+ // so nothing can unlock us
+ using Clock = std::chrono::steady_clock;
+ using DurationDouble = std::chrono::duration<double>;
+ using DurationClock = std::chrono::steady_clock::duration;
+
+ auto start_time = Clock::now();
+ std::chrono::duration<double> secs_left =
+ std::chrono::duration<double>(timeout_seconds_);
+ std::chrono::time_point<Clock> end_time =
+ std::chrono::time_point_cast<DurationClock, Clock,
DurationDouble>(start_time +
+
secs_left);
+ arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool();
+ Future<> future = Future<>::Make();
Review Comment:
```suggestion
auto future = Future<>::Make();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]