westonpace commented on code in PR #14024:
URL: https://github.com/apache/arrow/pull/14024#discussion_r961196591


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -126,6 +131,36 @@ Result<std::shared_ptr<RecordBatchReader>> 
ExecuteSerializedPlan(
   return sink_reader;
 }
 
+Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    const Buffer& substrait_buffer, PythonTableProvider& table_provider,
+    const ExtensionIdRegistry* registry, compute::FunctionRegistry* 
func_registry) {
+  // TODO(ARROW-15732)
+  // retrieve input table from table provider
+
+  NamedTableProvider named_table_provider =
+      [table_provider](
+          const std::vector<std::string>& names) -> 
Result<compute::Declaration> {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> input_table, 
table_provider(names));
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::TableSourceNodeOptions>(input_table);
+    return compute::Declaration("table_source", {}, options,
+                                "substrait_table_provider_source");
+  };
+
+  ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(named_table_provider);
+
+  compute::ExecContext exec_context(arrow::default_memory_pool(),
+                                    ::arrow::internal::GetCpuThreadPool(), 
func_registry);
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context));
+  SubstraitExecutor executor(std::move(plan), exec_context, 
conversion_options);
+  RETURN_NOT_OK(executor.Init(substrait_buffer, registry));
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+  // check closing here, not in destructor, to expose error to caller
+  RETURN_NOT_OK(executor.Close());
+  return sink_reader;

Review Comment:
   Rather than copy / paste can we call the other method here?



##########
python/pyarrow/_substrait.pyx:
##########
@@ -102,3 +103,40 @@ def get_supported_functions():
     for c_id in c_ids:
         functions_list.append(frombytes(c_id))
     return functions_list
+
+
+cdef shared_ptr[CTable] _process_named_table(dict named_args, const 
std_vector[c_string]& names):
+    cdef c_string c_name
+    py_names = []
+    for i in range(names.size()):
+        c_name = names[i]
+        py_names.append(frombytes(c_name))
+    return pyarrow_unwrap_table(named_args["provider"](py_names))
+
+
+def run_query_with_provider(plan, table_provider):

Review Comment:
   Can we make `table_provider` an optional argument for `run_query` rather 
than adding a new method?  I don't know if we can do it in cython (I think we 
can for a `def` method) or not but ideally the python API wouldn't need two 
methods.



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -20,18 +20,29 @@
 #include <memory>
 #include "arrow/compute/registry.h"
 #include "arrow/engine/substrait/api.h"
+#include "arrow/engine/substrait/options.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/optional.h"
 
 namespace arrow {
 
 namespace engine {
 
+using PythonTableProvider =
+    std::function<Result<std::shared_ptr<Table>>(const 
std::vector<std::string>&)>;
+
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> 
ExecuteSerializedPlan(
     const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = 
NULLPTR,
     compute::FunctionRegistry* func_registry = NULLPTR);
 
+/// \brief Retrieve a RecordBatchReader from a Substrait plan.
+/// Allows to use NamedT

Review Comment:
   Finish this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to