This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new e2b44378ed GH-47642: [C++] Catch exceptions from initial_task in 
AsyncTaskScheduler (#49860)
e2b44378ed is described below

commit e2b44378edf0ccceda07e8a6823890bd4ecc1952
Author: egolearner <[email protected]>
AuthorDate: Mon Jun 1 18:57:20 2026 +0800

    GH-47642: [C++] Catch exceptions from initial_task in AsyncTaskScheduler 
(#49860)
    
    ### Rationale for this change
    
    If the `initial_task` passed to `AsyncTaskScheduler::Make` throws a C++ 
exception (rather than returning a failed `Status`), `OnTaskFinished` is never 
called. This leaves `running_tasks_` permanently at 1, causing a `DCHECK` 
failure in debug builds and an indefinite hang in release builds because the 
scheduler's `finished` future is never completed. In Acero, this manifests as 
`DeclarationToTable` (and similar APIs) hanging forever when a `SourceNode` 
generator throws during `StartPro [...]
    
    ### What changes are included in this PR?
    
    Add exception handling logic.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No API changes.
    * GitHub Issue: #47642
    
    Lead-authored-by: egolearner <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/async_util.cc      | 15 ++++++++++++++-
 cpp/src/arrow/util/async_util_test.cc | 27 +++++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc
index f8b979a3f5..76537fbac9 100644
--- a/cpp/src/arrow/util/async_util.cc
+++ b/cpp/src/arrow/util/async_util.cc
@@ -23,6 +23,7 @@
 #include "arrow/util/tracing_internal.h"
 
 #include <condition_variable>
+#include <exception>
 #include <list>
 #include <memory>
 #include <mutex>
@@ -466,7 +467,19 @@ Future<> 
AsyncTaskScheduler::Make(FnOnce<Status(AsyncTaskScheduler*)> initial_ta
   auto scope = START_SCOPED_SPAN_SV(span, "AsyncTaskScheduler::InitialTask"sv);
   auto scheduler = 
std::make_unique<AsyncTaskSchedulerImpl>(std::move(stop_token),
                                                             
std::move(abort_callback));
-  Status initial_task_st = std::move(initial_task)(scheduler.get());
+  Status initial_task_st;
+  // GH-47642: We normally don't catch exceptions in Arrow C++ code, as the 
error
+  // reporting model uses the Status object instead. Usually, an uncaught 
exception
+  // will simply terminate the process, surfacing the programming error.
+  // However, an exception thrown from the initial task would result in a much
+  // harder to diagnose process hang.
+  try {
+    initial_task_st = std::move(initial_task)(scheduler.get());
+  } catch (const std::exception& e) {
+    initial_task_st = Status::UnknownError("Initial task threw an exception: 
", e.what());
+  } catch (...) {
+    initial_task_st = Status::UnknownError("Initial task threw an unknown 
exception");
+  }
   scheduler->OnTaskFinished(std::move(initial_task_st));
   // Keep scheduler alive until finished
   return scheduler->OnFinished().Then([scheduler = std::move(scheduler)] {});
diff --git a/cpp/src/arrow/util/async_util_test.cc 
b/cpp/src/arrow/util/async_util_test.cc
index 1f9aad453e..30b9eafe14 100644
--- a/cpp/src/arrow/util/async_util_test.cc
+++ b/cpp/src/arrow/util/async_util_test.cc
@@ -23,9 +23,11 @@
 #include <memory>
 #include <mutex>
 #include <queue>
+#include <stdexcept>
 #include <thread>
 #include <unordered_set>
 
+#include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
 
 #include "arrow/result.h"
@@ -204,6 +206,31 @@ TEST(AsyncTaskScheduler, InitialTaskFails) {
   ASSERT_FINISHES_AND_RAISES(Invalid, finished);
 }
 
+TEST(AsyncTaskScheduler, InitialTaskThrowsException) {
+  // If the initial task throws a C++ exception (not a Status), the scheduler
+  // should catch it, convert to a failed Status, and not hang indefinitely.
+  // See https://github.com/apache/arrow/issues/47642
+
+  // Case 1: initial task throws with no other tasks
+  Future<> finished =
+      AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) -> Status {
+        throw std::runtime_error("some exception");
+      });
+  EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+      UnknownError, ::testing::HasSubstr("some exception"), finished);
+
+  // Case 2: initial task throws while another task is still running
+  Future<> task = Future<>::Make();
+  finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) -> 
Status {
+    EXPECT_TRUE(scheduler->AddSimpleTask([&]() { return task; }, kDummyName));
+    throw std::runtime_error("some exception after adding task");
+  });
+  AssertNotFinished(finished);
+  task.MarkFinished();
+  EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+      UnknownError, ::testing::HasSubstr("some exception after adding task"), 
finished);
+}
+
 TEST(AsyncTaskScheduler, TaskDestroyedBeforeSchedulerEnds) {
   bool my_task_destroyed = false;
   Future<> task_fut = Future<>::Make();

Reply via email to