westonpace commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1254020989


##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
   }
   state_->current_thread = {};
 }
+#else   // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors(bool once_only) {

Review Comment:
   I believe that every time you call this method `once_only` is set to `true`. 
 Does this parameter need to be here?



##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,39 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+    while (true) {
+      if (IsFutureFinished(state_)) {
+        return;
+      }
+      arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);

Review Comment:
   If all the executors were idle then this could turn into a busy loop.  
However, for that to happen, it would mean there is a future that is getting 
fulfilled by something that isn't a task.  I don't think we do that very often. 
 The only example I can think of is S3 (since S3 uses its own threads) and I 
expect S3 support is disabled on emscripten?  It's probably not something we 
need to worry too much about now but maybe log a warning if 
`RunTasksOnAllExecutors` returns false here?



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
   }
   state_->current_thread = {};
 }
+#else   // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors(bool once_only) {
+  auto globalState = GetSerialExecutorGlobalState();
+  // if the previously called executor was deleted, ignore last_called_executor
+  if (globalState->last_called_executor != NULL &&
+      globalState->all_executors.count(globalState->last_called_executor) == 
0) {
+    globalState->last_called_executor = NULL;
+  }
+  bool run_task = true;
+  bool keep_going = true;
+  while (keep_going) {
+    run_task = false;
+    keep_going = false;
+    for (auto it = globalState->all_executors.begin();
+         it != globalState->all_executors.end(); ++it) {
+      if (globalState->last_called_executor != NULL) {
+        // always rerun loop if we have a last_called_executor, otherwise
+        // we drop out before everything is

Review Comment:
   everything is what?



##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,39 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+    while (true) {
+      if (IsFutureFinished(state_)) {
+        return;
+      }
+      arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+    }
+#endif
   }
 
   bool DoWait(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait_for(lock, std::chrono::duration<double>(seconds),
                  [this] { return IsFutureFinished(state_); });
     return IsFutureFinished(state_);
+#else
+    auto start = std::chrono::steady_clock::now();
+    std::chrono::duration<double> fsec = 
std::chrono::duration<double>(seconds);
+    while (std::chrono::steady_clock::now() - start < fsec) {
+      // run one task then check time
+      if (IsFutureFinished(state_)) {
+        return true;
+      }
+      arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+    }
+    return IsFutureFinished(state_);

Review Comment:
   No, I agree we shouldn't worry about it.



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -183,6 +267,110 @@ void SerialExecutor::RunLoop() {
   }
   state_->current_thread = {};
 }
+#else   // ARROW_ENABLE_THREADING
+bool SerialExecutor::RunTasksOnAllExecutors(bool once_only) {
+  auto globalState = GetSerialExecutorGlobalState();
+  // if the previously called executor was deleted, ignore last_called_executor
+  if (globalState->last_called_executor != NULL &&
+      globalState->all_executors.count(globalState->last_called_executor) == 
0) {
+    globalState->last_called_executor = NULL;
+  }
+  bool run_task = true;
+  bool keep_going = true;
+  while (keep_going) {
+    run_task = false;
+    keep_going = false;
+    for (auto it = globalState->all_executors.begin();
+         it != globalState->all_executors.end(); ++it) {
+      if (globalState->last_called_executor != NULL) {
+        // always rerun loop if we have a last_called_executor, otherwise
+        // we drop out before everything is
+        keep_going = true;
+        if 
(globalState->all_executors.count(globalState->last_called_executor) == 0 ||
+            globalState->last_called_executor == *it) {
+          // found the last one (or it doesn't exist ih the set any more)
+          // now we can start running things
+          globalState->last_called_executor = NULL;
+        }
+        // skip until after we have seen the last executor we called
+        // so that we do things nicely in turn
+        continue;
+      }
+      SerialExecutor* exe = *it;
+      // don't make reentrant calls inside a serialexecutor
+      // or more than the number of concurrent tasks set on a threadpool

Review Comment:
   ```suggestion
         // don't make more reentrant calls inside a serialexecutor
         // than the number of concurrent tasks set on a threadpool
   ```
   
   Though maybe I am misunderstanding this comment



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1530,6 +1547,16 @@ TEST(TestAsyncUtil, ReadaheadFailedWaitForInFlight) {
       // These are our in-flight tasks
       return TestInt(0);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   Agreed



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   // should all pass
   auto source = [&]() -> Future<TestInt> {
     auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
     return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
       gating_task->Task()();
       if (count == 0) {
         return Status::Invalid("X");
       }
       return TestInt(count);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because 
it will
+    // never return and will block everything

Review Comment:
   However, I'm also content to just skip this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to