This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7809c6d7cb ARROW-16417: [C++][Python] Segfault in test_exec_plan.py / 
test_joins
7809c6d7cb is described below

commit 7809c6d7cb9dcf327840b0f9db1bff436d381f29
Author: Weston Pace <[email protected]>
AuthorDate: Mon May 2 08:44:27 2022 -0400

    ARROW-16417: [C++][Python] Segfault in test_exec_plan.py / test_joins
    
    This builds on top of #13035 which is also important for avoiding 
segmentation faults.  On top of that there were a few more problems:
    
     * The python was using `SourceNodeOptions::FromTable` which is a rather 
dangerous method (mainly useful for unit testing) as it doesn't share ownership 
of the input table (even worse, it takes a const ref).  Python was not keeping 
the table alive and it was maybe possible for the table to deleted out from 
under the plan (I'm not entirely sure this was causing issues but it seemed 
risky).  I switched to TableSourceNode which shares ownership of the table (and 
is a bit more efficient).
     * Setting use_threads to False did nothing because `_perform_join` was not 
passing the arg on to `execplan`.
     * When fixing the above and running with `use_threads=False` it was 
creating a single thread executor but the current best practice is to pass in 
nullptr.
     * Finally, the actual bug was my improper fix in #12894 .  I had still 
left a small window open for `End` to be called between `Submit` and `AddTask` 
which would allow the task to be submitted but not participate in setting 
`finished` on the node.
    
    Closes #13036 from westonpace/bugfix/ARROW-16417--segfault-in-python-join
    
    Lead-authored-by: Weston Pace <[email protected]>
    Co-authored-by: David Li <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/compute/exec/hash_join_node.cc | 21 +++++++++++----------
 python/pyarrow/_exec_plan.pyx                | 22 +++++++++++-----------
 python/pyarrow/includes/libarrow.pxd         |  5 ++---
 3 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc 
b/cpp/src/arrow/compute/exec/hash_join_node.cc
index d28e3aeda4..0282e387c4 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -634,16 +634,17 @@ class HashJoinNode : public ExecNode {
   Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
     auto executor = plan_->exec_context()->executor();
     if (executor) {
-      ARROW_ASSIGN_OR_RAISE(auto task_fut, executor->Submit([this, func] {
-        size_t thread_index = thread_indexer_();
-        Status status = func(thread_index);
-        if (!status.ok()) {
-          StopProducing();
-          ErrorIfNotOk(status);
-          return;
-        }
-      }));
-      return task_group_.AddTask(task_fut);
+      return task_group_.AddTask([this, executor, func] {
+        return DeferNotOk(executor->Submit([this, func] {
+          size_t thread_index = thread_indexer_();
+          Status status = func(thread_index);
+          if (!status.ok()) {
+            StopProducing();
+            ErrorIfNotOk(status);
+            return;
+          }
+        }));
+      });
     } else {
       // We should not get here in serial execution mode
       ARROW_DCHECK(false);
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 909d12ed2c..7cbce9baa6 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -56,7 +56,6 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, 
c_bool use_threads
     """
     cdef:
         CExecutor *c_executor
-        shared_ptr[CThreadPool] c_executor_sptr
         shared_ptr[CExecContext] c_exec_context
         shared_ptr[CExecPlan] c_exec_plan
         vector[CDeclaration] c_decls
@@ -64,8 +63,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, 
c_bool use_threads
         vector[CExecNode*] c_final_node_vec
         CExecNode *c_node
         CTable* c_table
+        shared_ptr[CTable] c_in_table
         shared_ptr[CTable] c_out_table
-        shared_ptr[CSourceNodeOptions] c_sourceopts
+        shared_ptr[CTableSourceNodeOptions] c_tablesourceopts
         shared_ptr[CScanNodeOptions] c_scanopts
         shared_ptr[CExecNodeOptions] c_input_node_opts
         shared_ptr[CSinkNodeOptions] c_sinkopts
@@ -78,8 +78,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, 
c_bool use_threads
     if use_threads:
         c_executor = GetCpuThreadPool()
     else:
-        c_executor_sptr = GetResultValue(CThreadPool.Make(1))
-        c_executor = c_executor_sptr.get()
+        c_executor = NULL
 
     c_exec_context = make_shared[CExecContext](
         c_default_memory_pool(), c_executor)
@@ -90,12 +89,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] 
plan, c_bool use_threads
     # Create source nodes for each input
     for ipt in inputs:
         if isinstance(ipt, Table):
-            node_factory = "source"
-            c_in_table = pyarrow_unwrap_table(ipt).get()
-            c_sourceopts = GetResultValue(
-                CSourceNodeOptions.FromTable(deref(c_in_table), 
deref(c_exec_context).executor()))
-            c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CSourceNodeOptions](
-                c_sourceopts)
+            node_factory = "table_source"
+            c_in_table = pyarrow_unwrap_table(ipt)
+            c_tablesourceopts = make_shared[CTableSourceNodeOptions](
+                c_in_table, 1 << 20)
+            c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CTableSourceNodeOptions](
+                c_tablesourceopts)
         elif isinstance(ipt, Dataset):
             node_factory = "scan"
             c_in_dataset = (<Dataset>ipt).unwrap()
@@ -348,6 +347,7 @@ def _perform_join(join_type, left_operand not None, 
left_keys,
 
     result_table = execplan([left_operand, right_operand],
                             plan=c_decl_plan,
-                            output_type=output_type)
+                            output_type=output_type,
+                            use_threads=use_threads)
 
     return result_table
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index e34bc7a28f..cc52102ef8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2447,9 +2447,8 @@ cdef extern from "arrow/compute/exec/options.h" namespace 
"arrow::compute" nogil
     cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions":
         pass
 
-    cdef cppclass CSourceNodeOptions 
"arrow::compute::SourceNodeOptions"(CExecNodeOptions):
-        @staticmethod
-        CResult[shared_ptr[CSourceNodeOptions]] FromTable(const CTable& table, 
CExecutor*)
+    cdef cppclass CTableSourceNodeOptions 
"arrow::compute::TableSourceNodeOptions"(CExecNodeOptions):
+        CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t 
max_batch_size)
 
     cdef cppclass CSinkNodeOptions 
"arrow::compute::SinkNodeOptions"(CExecNodeOptions):
         pass

Reply via email to