kou commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1275675227


##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;
+    } else {
+      if (std::chrono::steady_clock::now() > end_time) {
+        num_finished_++;
+        future.MarkFinished(
+            Status::Invalid("Task unlock never happened - if threads are 
disabled you "
+                            "can't wait on gatedtask"));
+        return;
+      } else {
+        SleepABit();
+        auto spawn_status = executor->Spawn([this, end_time, executor, 
future]() {
+          WaitForEndOrUnlocked(end_time, executor, future);
+        });
+        if (!spawn_status.ok()) {
+          status_ &= Status::Invalid("Couldn't spawn gating task unlock 
waiter");
+        }
+      }
+    }
+  }
+
+  Future<> RunTaskFuture() {
+    num_running_++;
+    // post the unlock check as a separate task
+    // otherwise we'll never let anything else run
+    // so nothing can unlock us
+    using Clock = std::chrono::steady_clock;
+    using DurationDouble = std::chrono::duration<double>;
+    using DurationClock = std::chrono::steady_clock::duration;
+
+    auto start_time = Clock::now();
+    std::chrono::duration<double> secs_left =
+        std::chrono::duration<double>(timeout_seconds_);
+    std::chrono::time_point<Clock> end_time =

Review Comment:
   Can we use `auto` here?



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;
+    } else {
+      if (std::chrono::steady_clock::now() > end_time) {
+        num_finished_++;
+        future.MarkFinished(
+            Status::Invalid("Task unlock never happened - if threads are 
disabled you "
+                            "can't wait on gatedtask"));
+        return;

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;
+    } else {
+      if (std::chrono::steady_clock::now() > end_time) {
+        num_finished_++;
+        future.MarkFinished(
+            Status::Invalid("Task unlock never happened - if threads are 
disabled you "
+                            "can't wait on gatedtask"));
+        return;
+      } else {
+        SleepABit();
+        auto spawn_status = executor->Spawn([this, end_time, executor, 
future]() {
+          WaitForEndOrUnlocked(end_time, executor, future);
+        });
+        if (!spawn_status.ok()) {
+          status_ &= Status::Invalid("Couldn't spawn gating task unlock 
waiter");
+        }
+      }
+    }
+  }
+
+  Future<> RunTaskFuture() {
+    num_running_++;
+    // post the unlock check as a separate task
+    // otherwise we'll never let anything else run
+    // so nothing can unlock us
+    using Clock = std::chrono::steady_clock;
+    using DurationDouble = std::chrono::duration<double>;
+    using DurationClock = std::chrono::steady_clock::duration;
+
+    auto start_time = Clock::now();
+    std::chrono::duration<double> secs_left =

Review Comment:
   ```suggestion
       auto secs_left =
   ```



##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,47 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+    auto last_processed_time = std::chrono::steady_clock::now();
+    while (true) {
+      if (IsFutureFinished(state_)) {
+        return;
+      }
+      if (arrow::internal::SerialExecutor::RunTasksOnAllExecutors() == false) {
+        auto this_time = std::chrono::steady_clock::now();
+        if (this_time - last_processed_time < std::chrono::seconds(10)) {
+          ARROW_LOG(WARNING) << "Waiting for future, but no executors have had 
any tasks "
+                                "pending for last 10 seconds";
+          last_processed_time = std::chrono::steady_clock::now();
+        }
+      }
+    }
+#endif
   }
 
   bool DoWait(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait_for(lock, std::chrono::duration<double>(seconds),
                  [this] { return IsFutureFinished(state_); });
     return IsFutureFinished(state_);
+#else
+    auto start = std::chrono::steady_clock::now();
+    std::chrono::duration<double> fsec = 
std::chrono::duration<double>(seconds);

Review Comment:
   ```suggestion
       auto fsec = std::chrono::duration<double>(seconds);
   ```



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
   }
   state_->current_thread = {};
 }
+#else   // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors() {
+  auto globalState = GetSerialExecutorGlobalState();
+  // if the previously called executor was deleted, ignore last_called_executor
+  if (globalState->last_called_executor != NULL &&
+      globalState->all_executors.count(globalState->last_called_executor) == 
0) {
+    globalState->last_called_executor = NULL;
+  }
+  bool run_task = true;
+  bool keep_going = true;
+  while (keep_going) {
+    run_task = false;
+    keep_going = false;
+    for (auto it = globalState->all_executors.begin();
+         it != globalState->all_executors.end(); ++it) {
+      if (globalState->last_called_executor != NULL) {
+        // always rerun loop if we have a last_called_executor, otherwise
+        // we may drop out before every executor has been checked
+        keep_going = true;
+        if 
(globalState->all_executors.count(globalState->last_called_executor) == 0 ||
+            globalState->last_called_executor == *it) {
+          // found the last one (or it doesn't exist ih the set any more)
+          // now we can start running things
+          globalState->last_called_executor = NULL;
+        }
+        // skip until after we have seen the last executor we called
+        // so that we do things nicely in turn
+        continue;
+      }
+      SerialExecutor* exe = *it;

Review Comment:
   ```suggestion
         auto exe = *it;
   ```



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1530,6 +1547,16 @@ TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
       // These are our in-flight tasks
       return TestInt(0);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   @joemarshall Could you check this?



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   // should all pass
   auto source = [&]() -> Future<TestInt> {
     auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
     return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
       gating_task->Task()();
       if (count == 0) {
         return Status::Invalid("X");
       }
       return TestInt(count);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   @joemarshall Could you check this?



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;

Review Comment:
   Or:
   
   ```cpp
   if (unlocked_) {
     ...
     return;
   }
   
   if (std::chrono::steady_clock::now() > end_time) {
     ...
     return;
   }
   
   SleepABit();
   ...
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;
+    } else {
+      if (std::chrono::steady_clock::now() > end_time) {
+        num_finished_++;
+        future.MarkFinished(
+            Status::Invalid("Task unlock never happened - if threads are 
disabled you "
+                            "can't wait on gatedtask"));
+        return;
+      } else {
+        SleepABit();
+        auto spawn_status = executor->Spawn([this, end_time, executor, 
future]() {
+          WaitForEndOrUnlocked(end_time, executor, future);
+        });
+        if (!spawn_status.ok()) {
+          status_ &= Status::Invalid("Couldn't spawn gating task unlock 
waiter");
+        }
+      }
+    }
+  }
+
+  Future<> RunTaskFuture() {
+    num_running_++;
+    // post the unlock check as a separate task
+    // otherwise we'll never let anything else run
+    // so nothing can unlock us
+    using Clock = std::chrono::steady_clock;
+    using DurationDouble = std::chrono::duration<double>;
+    using DurationClock = std::chrono::steady_clock::duration;
+
+    auto start_time = Clock::now();
+    std::chrono::duration<double> secs_left =
+        std::chrono::duration<double>(timeout_seconds_);
+    std::chrono::time_point<Clock> end_time =
+        std::chrono::time_point_cast<DurationClock, Clock, 
DurationDouble>(start_time +
+                                                                           
secs_left);
+    arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool();

Review Comment:
   ```suggestion
       auto executor = arrow::internal::GetCpuThreadPool();
   ```



##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1051,60 @@ class GatingTask::Impl : public 
std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> 
end_time,
+                            arrow::internal::Executor* executor, Future<> 
future) {
+    if (unlocked_) {
+      num_finished_++;
+      future.MarkFinished(Status::OK());
+      return;
+    } else {
+      if (std::chrono::steady_clock::now() > end_time) {
+        num_finished_++;
+        future.MarkFinished(
+            Status::Invalid("Task unlock never happened - if threads are 
disabled you "
+                            "can't wait on gatedtask"));
+        return;
+      } else {
+        SleepABit();
+        auto spawn_status = executor->Spawn([this, end_time, executor, 
future]() {
+          WaitForEndOrUnlocked(end_time, executor, future);
+        });
+        if (!spawn_status.ok()) {
+          status_ &= Status::Invalid("Couldn't spawn gating task unlock 
waiter");
+        }
+      }
+    }
+  }
+
+  Future<> RunTaskFuture() {
+    num_running_++;
+    // post the unlock check as a separate task
+    // otherwise we'll never let anything else run
+    // so nothing can unlock us
+    using Clock = std::chrono::steady_clock;
+    using DurationDouble = std::chrono::duration<double>;
+    using DurationClock = std::chrono::steady_clock::duration;
+
+    auto start_time = Clock::now();
+    std::chrono::duration<double> secs_left =
+        std::chrono::duration<double>(timeout_seconds_);
+    std::chrono::time_point<Clock> end_time =
+        std::chrono::time_point_cast<DurationClock, Clock, 
DurationDouble>(start_time +
+                                                                           
secs_left);
+    arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool();
+    Future<> future = Future<>::Make();

Review Comment:
   ```suggestion
       auto future = Future<>::Make();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to