westonpace commented on code in PR #14024:
URL: https://github.com/apache/arrow/pull/14024#discussion_r961978236


##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -165,3 +165,172 @@ def test_get_supported_functions():
                         'functions_arithmetic.yaml', 'add')
     assert has_function(supported_functions,
                         'functions_arithmetic.yaml', 'sum')
+
+
+def test_named_table():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+    test_table_2 = pa.Table.from_pydict({"x": [4, 5, 6]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        elif names[1] == "t2":
+            return test_table_2
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": ["TABLE_NAME_PLACEHOLDER"]
+            }
+            }
+        }}
+        ]
+    }
+    """
+    table_name = "t1"
+    query = tobytes(substrait_query.replace(
+        "TABLE_NAME_PLACEHOLDER", table_name))
+    buf = pa._substrait._parse_json_plan(tobytes(query))
+    reader = pa.substrait.run_query(buf, table_provider)
+    res_tb = reader.read_all()
+    assert res_tb == test_table_1
+
+
+def test_named_table_invalid_table_name():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": ["TABLE_NAME_PLACEHOLDER"]

Review Comment:
   No need to use a placeholder.



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -20,16 +20,22 @@
 #include <memory>
 #include "arrow/compute/registry.h"
 #include "arrow/engine/substrait/api.h"
+#include "arrow/engine/substrait/options.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/optional.h"
 
 namespace arrow {
 
 namespace engine {
 
+using PythonTableProvider =
+    std::function<Result<std::shared_ptr<Table>>(const 
std::vector<std::string>&)>;
+
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
+/// Allows to use NamedTableProvider in Substrait plan execution.

Review Comment:
   ```suggestion
   ```
   
   If we are going to document one param we should document all the params



##########
python/pyarrow/_substrait.pyx:
##########
@@ -25,14 +26,84 @@ from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_substrait cimport *
 
 
-def run_query(plan):
+cdef shared_ptr[CTable] _process_named_table(dict named_args, const 
std_vector[c_string]& names):
+    cdef:
+        c_string c_name
+        shared_ptr[CTable] empty_table
+    py_names = []
+
+    # provider function could raise an exception
+    try:
+        for i in range(names.size()):
+            c_name = names[i]
+            py_names.append(frombytes(c_name))
+        return pyarrow_unwrap_table(named_args["provider"](py_names))
+    except Exception:
+        return empty_table
+
+
+def run_query(plan, table_provider=None):
     """
     Execute a Substrait plan and read the results as a RecordBatchReader.
 
     Parameters
     ----------
     plan : Buffer
         The serialized Substrait plan to execute.
+    table_provider : object (optional)
+        The function determining input table based on names
+
+    Returns
+    -------
+    RecordBatchReader
+        A reader containing the result of the executed query
+
+    Examples
+    --------
+    >>> import pyarrow as pa
+    >>> from pyarrow.lib import tobytes
+    >>> import pyarrow.substrait as substrait
+    >>> test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+    >>> test_table_2 = pa.Table.from_pydict({"x": [4, 5, 6]})
+    >>> def table_provider(names):
+    ...     if not names:
+    ...        raise Exception("No names provided")
+    ...     elif names[0] == "t1":
+    ...        return test_table_1
+    ...     elif names[1] == "t2":
+    ...        return test_table_2
+    ...     else:
+    ...        raise Exception("Unrecognized table name")
+    ... 
+    >>> substrait_query = '''
+    ...         {
+    ...             "relations": [
+    ...             {"rel": {
+    ...                 "read": {
+    ...                 "base_schema": {
+    ...                     "struct": {
+    ...                     "types": [
+    ...                                 {"i64": {}}
+    ...                             ]
+    ...                     },
+    ...                     "names": [
+    ...                             "x"
+    ...                             ]
+    ...                 },
+    ...                 "namedTable": {
+    ...                         "names": ["TABLE_NAME_PLACEHOLDER"]
+    ...                 }
+    ...                 }
+    ...             }}
+    ...             ]
+    ...         }
+    ... '''
+    >>> table_name = "t1"
+    >>> query = tobytes(substrait_query.replace(
+    ...             "TABLE_NAME_PLACEHOLDER", table_name))

Review Comment:
   ```suggestion
       >>> substrait_query = '''
       ...         {
       ...             "relations": [
       ...             {"rel": {
       ...                 "read": {
       ...                 "base_schema": {
       ...                     "struct": {
       ...                     "types": [
       ...                                 {"i64": {}}
       ...                             ]
       ...                     },
       ...                     "names": [
       ...                             "x"
       ...                             ]
       ...                 },
       ...                 "namedTable": {
       ...                         "names": ["t1"]
       ...                 }
       ...                 }
       ...             }}
       ...             ]
       ...         }
       ... '''
   ```
   There is no need to use the placeholder pattern here.



##########
python/pyarrow/_substrait.pyx:
##########
@@ -25,14 +26,84 @@ from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_substrait cimport *
 
 
-def run_query(plan):
+cdef shared_ptr[CTable] _process_named_table(dict named_args, const 
std_vector[c_string]& names):
+    cdef:
+        c_string c_name
+        shared_ptr[CTable] empty_table
+    py_names = []
+
+    # provider function could raise an exception
+    try:
+        for i in range(names.size()):
+            c_name = names[i]
+            py_names.append(frombytes(c_name))
+        return pyarrow_unwrap_table(named_args["provider"](py_names))
+    except Exception:
+        return empty_table
+
+
+def run_query(plan, table_provider=None):
     """
     Execute a Substrait plan and read the results as a RecordBatchReader.
 
     Parameters
     ----------
     plan : Buffer
         The serialized Substrait plan to execute.
+    table_provider : object (optional)
+        The function determining input table based on names

Review Comment:
   ```suggestion
           A function to resolve any NamedTable relation to a table.  The 
function
           will receive a single argument which will be a list of strings 
representing the table
           name and should return a pyarrow.Table.
   ```
   
   Presumably users of this API will be familiar with Substrait so we can 
reference `NamedTable` directly.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -107,19 +111,39 @@ class SubstraitExecutor {
   bool plan_started_;
   compute::ExecContext exec_context_;
   std::shared_ptr<SubstraitSinkConsumer> sink_consumer_;
+  const ConversionOptions& conversion_options_;
 };
 
 }  // namespace
 
 Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer, const ExtensionIdRegistry* extid_registry,
-    compute::FunctionRegistry* func_registry) {
+    const Buffer& substrait_buffer, const PythonTableProvider& table_provider,
+    const ExtensionIdRegistry* registry, compute::FunctionRegistry* 
func_registry) {
   // TODO(ARROW-15732)
+  // retrieve input table from table provider
+  ConversionOptions conversion_options;
+  if (table_provider) {
+    NamedTableProvider named_table_provider =
+        [table_provider](
+            const std::vector<std::string>& names) -> 
Result<compute::Declaration> {
+      if (names.empty()) {
+        return Status::Invalid("names for NamedTable not provided");
+      }

Review Comment:
   It wasn't clear to me if a name was required or not so I added 
https://github.com/substrait-io/substrait/pull/308
   
   If that merges then we are fine returning an invalid status here since this 
would be an invalid plan.  However, we can probably reject it in 
`relation_internal.cc` instead so we don't have to repeat this logic for each 
binding.



##########
python/pyarrow/_substrait.pyx:
##########
@@ -25,14 +26,84 @@ from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_substrait cimport *
 
 
-def run_query(plan):
+cdef shared_ptr[CTable] _process_named_table(dict named_args, const 
std_vector[c_string]& names):
+    cdef:
+        c_string c_name
+        shared_ptr[CTable] empty_table
+    py_names = []
+
+    # provider function could raise an exception
+    try:
+        for i in range(names.size()):
+            c_name = names[i]
+            py_names.append(frombytes(c_name))
+        return pyarrow_unwrap_table(named_args["provider"](py_names))
+    except Exception:
+        return empty_table

Review Comment:
   I don't know that an empty table is the correct thing to return here.  Maybe 
we can return `CResult[shared_ptr[CTable]]` so that we can return an invalid 
status?  Ideally we would capture the exception message to put into the status 
too. I'm not sure if we have an example of that.



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -165,3 +165,172 @@ def test_get_supported_functions():
                         'functions_arithmetic.yaml', 'add')
     assert has_function(supported_functions,
                         'functions_arithmetic.yaml', 'sum')
+
+
+def test_named_table():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+    test_table_2 = pa.Table.from_pydict({"x": [4, 5, 6]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        elif names[1] == "t2":
+            return test_table_2
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": ["TABLE_NAME_PLACEHOLDER"]
+            }
+            }
+        }}
+        ]
+    }
+    """
+    table_name = "t1"
+    query = tobytes(substrait_query.replace(
+        "TABLE_NAME_PLACEHOLDER", table_name))
+    buf = pa._substrait._parse_json_plan(tobytes(query))
+    reader = pa.substrait.run_query(buf, table_provider)
+    res_tb = reader.read_all()
+    assert res_tb == test_table_1
+
+
+def test_named_table_invalid_table_name():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": ["TABLE_NAME_PLACEHOLDER"]
+            }
+            }
+        }}
+        ]
+    }
+    """
+    table_name = "t3"
+    query = tobytes(substrait_query.replace(
+        "TABLE_NAME_PLACEHOLDER", table_name))
+    buf = pa._substrait._parse_json_plan(tobytes(query))
+
+    exec_message = "TableSourceNode requires table which is not null"
+    with pytest.raises(ArrowInvalid, match=exec_message):
+        substrait.run_query(buf, table_provider)
+
+
+def test_named_table_empty_names():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": []
+            }
+            }
+        }}
+        ]
+    }
+    """
+    query = tobytes(substrait_query)
+    buf = pa._substrait._parse_json_plan(tobytes(query))
+    exec_message = "names for NamedTable not provided"
+    with pytest.raises(ArrowInvalid, match=exec_message):
+        substrait.run_query(buf, table_provider)
+
+
+def test_named_table_no_named_table():

Review Comment:
   I'm not sure how much value this test is providing.  At the very least, it 
doesn't seem to be related to named tables at all and is more just testing the 
C++ consumer's ability to reject invalid plans.



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -165,3 +165,172 @@ def test_get_supported_functions():
                         'functions_arithmetic.yaml', 'add')
     assert has_function(supported_functions,
                         'functions_arithmetic.yaml', 'sum')
+
+
+def test_named_table():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+    test_table_2 = pa.Table.from_pydict({"x": [4, 5, 6]})
+
+    def table_provider(names):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        elif names[1] == "t2":
+            return test_table_2
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"i64": {}}
+                        ]
+                },
+                "names": [
+                        "x"
+                        ]
+            },
+            "namedTable": {
+                    "names": ["TABLE_NAME_PLACEHOLDER"]

Review Comment:
   No need to use a placeholder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to