This is an automated email from the ASF dual-hosted git repository.
kosiew pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0154c031 CHANGES to review
0154c031 is described below
commit 0154c03129ac901aea9d6cf6b5d1ff25131c486d
Author: Siew Kam Onn <[email protected]>
AuthorDate: Thu Oct 2 11:40:39 2025 +0800
CHANGES to review
---
docs/source/conf.py | 7 ++
docs/source/contributor-guide/ffi.rst | 2 +-
docs/source/user-guide/data-sources.rst | 4 +-
docs/source/user-guide/io/table_provider.rst | 20 ++--
.../python/tests/_test_catalog_provider.py | 6 +-
.../python/tests/_test_table_function.py | 2 +-
.../python/tests/_test_table_provider.py | 2 +-
python/datafusion/__init__.py | 12 +--
python/datafusion/catalog.py | 62 ++++++++----
python/datafusion/context.py | 42 +++++---
python/datafusion/dataframe.py | 20 +++-
python/datafusion/expr.py | 11 +-
python/datafusion/io.py | 2 +-
python/tests/test_catalog.py | 24 ++++-
python/tests/test_context.py | 35 +++++++
python/tests/test_wrapper_coverage.py | 37 ++++---
src/catalog.rs | 112 ++-------------------
src/context.rs | 36 ++-----
src/dataframe.rs | 55 ++--------
src/lib.rs | 1 +
src/table.rs | 106 +++++++++++++++++++
src/udtf.rs | 26 ++---
src/utils.rs | 37 +++++--
23 files changed, 383 insertions(+), 278 deletions(-)
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 28db17d3..01813b03 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -91,6 +91,13 @@ def autoapi_skip_member_fn(app, what, name, obj, skip,
options) -> bool: # noqa
("method", "datafusion.context.SessionContext.tables"),
("method", "datafusion.dataframe.DataFrame.unnest_column"),
]
+ # Explicitly skip certain members listed above. These are either
+ # re-exports, duplicate module-level documentation, deprecated
+ # API surfaces, or private variables that would otherwise appear
+ # in the generated docs and cause confusing duplication.
+ # Keeping this explicit list avoids surprising entries in the
+ # AutoAPI output and gives us a single place to opt-out items
+ # when we intentionally hide them from the docs.
if (what, name) in skip_contents:
skip = True
diff --git a/docs/source/contributor-guide/ffi.rst
b/docs/source/contributor-guide/ffi.rst
index e201db71..72fba8e3 100644
--- a/docs/source/contributor-guide/ffi.rst
+++ b/docs/source/contributor-guide/ffi.rst
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of
DataFusion, you may dec
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a
Python library.
At first glance, it may appear the best way to do this is to add the
``datafusion-python``
-crate as a dependency, provide a ``PyTable``, and then to register it with the
+crate as a dependency, provide a ``PyTable``, and then to register it with the
``SessionContext``. Unfortunately, this will not work.
When you produce your code as a Python library and it needs to interact with
the DataFusion
diff --git a/docs/source/user-guide/data-sources.rst
b/docs/source/user-guide/data-sources.rst
index a9b119b9..26f1303c 100644
--- a/docs/source/user-guide/data-sources.rst
+++ b/docs/source/user-guide/data-sources.rst
@@ -154,11 +154,11 @@ as Delta Lake. This will require a recent version of
from deltalake import DeltaTable
delta_table = DeltaTable("path_to_table")
- ctx.register_table_provider("my_delta_table", delta_table)
+ ctx.register_table("my_delta_table", delta_table)
df = ctx.table("my_delta_table")
df.show()
-On older versions of ``deltalake`` (prior to 0.22) you can use the
+On older versions of ``deltalake`` (prior to 0.22) you can use the
`Arrow DataSet
<https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
interface to import to DataFusion, but this does not support features such as
filter push down
which can lead to a significant performance difference.
diff --git a/docs/source/user-guide/io/table_provider.rst
b/docs/source/user-guide/io/table_provider.rst
index bd1d6b80..29e5d988 100644
--- a/docs/source/user-guide/io/table_provider.rst
+++ b/docs/source/user-guide/io/table_provider.rst
@@ -37,22 +37,26 @@ A complete example can be found in the `examples folder
<https://github.com/apac
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
- let name = CString::new("datafusion_table_provider").unwrap();
+ let name = cr"datafusion_table_provider".into();
- let provider = Arc::new(self.clone())
- .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
- let provider = FFI_TableProvider::new(Arc::new(provider), false);
+ let provider = Arc::new(self.clone());
+ let provider = FFI_TableProvider::new(provider, false, None);
PyCapsule::new_bound(py, provider, Some(name.clone()))
}
}
-Once you have this library available, in python you can register your table
provider
-to the ``SessionContext``.
+Once you have this library available, you can construct a
+:py:class:`~datafusion.Table` in Python and register it with the
+``SessionContext``.
.. code-block:: python
+ from datafusion import SessionContext, Table
+
+ ctx = SessionContext()
provider = MyTableProvider()
- ctx.register_table_provider("my_table", provider)
- ctx.table("my_table").show()
+ ctx.register_table("capsule_table", provider)
+
+ ctx.table("capsule_table").show()
diff --git
a/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
b/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
index 72aadf64..1bf1bf13 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
@@ -36,9 +36,9 @@ def test_catalog_provider():
my_catalog_schemas = my_catalog.names()
assert expected_schema_name in my_catalog_schemas
- my_database = my_catalog.database(expected_schema_name)
- assert expected_table_name in my_database.names()
- my_table = my_database.table(expected_table_name)
+ my_schema = my_catalog.schema(expected_schema_name)
+ assert expected_table_name in my_schema.names()
+ my_table = my_schema.table(expected_table_name)
assert expected_table_columns == my_table.schema.names
result = ctx.table(
diff --git
a/examples/datafusion-ffi-example/python/tests/_test_table_function.py
b/examples/datafusion-ffi-example/python/tests/_test_table_function.py
index f3c56a90..4b8b2145 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_table_function.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_table_function.py
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
table_udtf = udtf(table_func, "my_table_func")
my_table = table_udtf()
- ctx.register_table_provider("t", my_table)
+ ctx.register_table("t", my_table)
result = ctx.table("t").collect()
assert len(result) == 2
diff --git
a/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
b/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
index 6b24da06..91a77e61 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
@@ -25,7 +25,7 @@ from datafusion_ffi_example import MyTableProvider
def test_table_loading():
ctx = SessionContext()
table = MyTableProvider(3, 2, 4)
- ctx.register_table_provider("t", table)
+ ctx.register_table("t", table)
result = ctx.table("t").collect()
assert len(result) == 4
diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py
index e9d2dba7..9ebd58ea 100644
--- a/python/datafusion/__init__.py
+++ b/python/datafusion/__init__.py
@@ -28,17 +28,16 @@ from typing import Any
try:
import importlib.metadata as importlib_metadata
except ImportError:
- import importlib_metadata
+ import importlib_metadata # type: ignore[import]
+# Public submodules
from . import functions, object_store, substrait, unparser
# The following imports are okay to remain as opaque to the user.
from ._internal import Config
from .catalog import Catalog, Database, Table
from .col import col, column
-from .common import (
- DFSchema,
-)
+from .common import DFSchema
from .context import (
RuntimeEnvBuilder,
SessionConfig,
@@ -47,10 +46,7 @@ from .context import (
)
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
from .dataframe_formatter import configure_formatter
-from .expr import (
- Expr,
- WindowFrame,
-)
+from .expr import Expr, WindowFrame
from .io import read_avro, read_csv, read_json, read_parquet
from .plan import ExecutionPlan, LogicalPlan
from .record_batch import RecordBatch, RecordBatchStream
diff --git a/python/datafusion/catalog.py b/python/datafusion/catalog.py
index 536b3a79..da54d233 100644
--- a/python/datafusion/catalog.py
+++ b/python/datafusion/catalog.py
@@ -20,13 +20,16 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Protocol
+from typing import TYPE_CHECKING, Any, Protocol
import datafusion._internal as df_internal
if TYPE_CHECKING:
import pyarrow as pa
+ from datafusion import DataFrame
+ from datafusion.context import TableProviderExportable
+
try:
from warnings import deprecated # Python 3.13+
except ImportError:
@@ -82,7 +85,11 @@ class Catalog:
"""Returns the database with the given ``name`` from this catalog."""
return self.schema(name)
- def register_schema(self, name, schema) -> Schema | None:
+ def register_schema(
+ self,
+ name: str,
+ schema: Schema | SchemaProvider | SchemaProviderExportable,
+ ) -> Schema | None:
"""Register a schema with this catalog."""
if isinstance(schema, Schema):
return self.catalog.register_schema(name, schema._raw_schema)
@@ -122,10 +129,12 @@ class Schema:
"""Return the table with the given ``name`` from this schema."""
return Table(self._raw_schema.table(name))
- def register_table(self, name, table) -> None:
- """Register a table provider in this schema."""
- if isinstance(table, Table):
- return self._raw_schema.register_table(name, table.table)
+ def register_table(
+ self,
+ name: str,
+ table: Table | TableProviderExportable | DataFrame |
pa.dataset.Dataset,
+ ) -> None:
+ """Register a table in this schema."""
return self._raw_schema.register_table(name, table)
def deregister_table(self, name: str) -> None:
@@ -139,30 +148,45 @@ class Database(Schema):
class Table:
- """DataFusion table."""
+ """A DataFusion table.
- def __init__(self, table: df_internal.catalog.RawTable) -> None:
- """This constructor is not typically called by the end user."""
- self.table = table
+ Internally we currently support the following types of tables:
+
+ - Tables created using built-in DataFusion methods, such as
+ reading from CSV or Parquet
+ - pyarrow datasets
+ - DataFusion DataFrames, which will be converted into a view
+ - Externally provided tables implemented with the FFI PyCapsule
+ interface (advanced)
+ """
+
+ __slots__ = ("_inner",)
+
+ def __init__(
+ self, table: Table | TableProviderExportable | DataFrame |
pa.dataset.Dataset
+ ) -> None:
+ """Constructor."""
+ self._inner = df_internal.catalog.RawTable(table)
def __repr__(self) -> str:
"""Print a string representation of the table."""
- return self.table.__repr__()
+ return repr(self._inner)
@staticmethod
+ @deprecated("Use Table() constructor instead.")
def from_dataset(dataset: pa.dataset.Dataset) -> Table:
- """Turn a pyarrow Dataset into a Table."""
- return Table(df_internal.catalog.RawTable.from_dataset(dataset))
+ """Turn a :mod:`pyarrow.dataset` ``Dataset`` into a :class:`Table`."""
+ return Table(dataset)
@property
def schema(self) -> pa.Schema:
"""Returns the schema associated with this table."""
- return self.table.schema
+ return self._inner.schema
@property
def kind(self) -> str:
"""Returns the kind of table."""
- return self.table.kind
+ return self._inner.kind
class CatalogProvider(ABC):
@@ -219,14 +243,16 @@ class SchemaProvider(ABC):
"""Retrieve a specific table from this schema."""
...
- def register_table(self, name: str, table: Table) -> None: # noqa: B027
- """Add a table from this schema.
+ def register_table( # noqa: B027
+ self, name: str, table: Table | TableProviderExportable | Any
+ ) -> None:
+ """Add a table to this schema.
This method is optional. If your schema provides a fixed list of
tables, you do
not need to implement this method.
"""
- def deregister_table(self, name, cascade: bool) -> None: # noqa: B027
+ def deregister_table(self, name: str, cascade: bool) -> None: # noqa: B027
"""Remove a table from this schema.
This method is optional. If your schema provides a fixed list of
tables, you do
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index b6e728b5..86b9d90e 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -29,11 +29,10 @@ except ImportError:
import pyarrow as pa
-from datafusion.catalog import Catalog, CatalogProvider, Table
+from datafusion.catalog import Catalog
from datafusion.dataframe import DataFrame
-from datafusion.expr import SortKey, sort_list_to_raw_sort_list
+from datafusion.expr import sort_list_to_raw_sort_list
from datafusion.record_batch import RecordBatchStream
-from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction,
WindowUDF
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SessionConfig as SessionConfigInternal
@@ -48,7 +47,15 @@ if TYPE_CHECKING:
import pandas as pd
import polars as pl # type: ignore[import]
+ from datafusion.catalog import CatalogProvider, Table
+ from datafusion.expr import SortKey
from datafusion.plan import ExecutionPlan, LogicalPlan
+ from datafusion.user_defined import (
+ AggregateUDF,
+ ScalarUDF,
+ TableFunction,
+ WindowUDF,
+ )
class ArrowStreamExportable(Protocol):
@@ -733,7 +740,7 @@ class SessionContext:
#
https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
# is the discussion on how we arrived at adding register_view
def register_view(self, name: str, df: DataFrame) -> None:
- """Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
+ """Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
Args:
name (str): The name to register the view under.
@@ -742,16 +749,21 @@ class SessionContext:
view = df.into_view()
self.ctx.register_table(name, view)
- def register_table(self, name: str, table: Table) -> None:
- """Register a :py:class: `~datafusion.catalog.Table` as a table.
+ def register_table(
+ self,
+ name: str,
+ table: Table | TableProviderExportable | DataFrame |
pa.dataset.Dataset,
+ ) -> None:
+ """Register a :py:class:`~datafusion.Table` with this context.
- The registered table can be referenced from SQL statement executed
against.
+ The registered table can be referenced from SQL statements executed
against
+ this context.
Args:
name: Name of the resultant table.
- table: DataFusion table to add to the session context.
+ table: Any object that can be converted into a :class:`Table`.
"""
- self.ctx.register_table(name, table.table)
+ self.ctx.register_table(name, table)
def deregister_table(self, name: str) -> None:
"""Remove a table from the session."""
@@ -770,15 +782,17 @@ class SessionContext:
else:
self.ctx.register_catalog_provider(name, provider)
+ @deprecated("Use register_table() instead.")
def register_table_provider(
- self, name: str, provider: TableProviderExportable
+ self,
+ name: str,
+ provider: Table | TableProviderExportable | DataFrame |
pa.dataset.Dataset,
) -> None:
"""Register a table provider.
- This table provider must have a method called
``__datafusion_table_provider__``
- which returns a PyCapsule that exposes a ``FFI_TableProvider``.
+ Deprecated: use :meth:`register_table` instead.
"""
- self.ctx.register_table_provider(name, provider)
+ self.register_table(name, provider)
def register_udtf(self, func: TableFunction) -> None:
"""Register a user defined table function."""
@@ -1170,7 +1184,7 @@ class SessionContext:
:py:class:`~datafusion.catalog.ListingTable`, create a
:py:class:`~datafusion.dataframe.DataFrame`.
"""
- return DataFrame(self.ctx.read_table(table.table))
+ return DataFrame(self.ctx.read_table(table._inner))
def execute(self, plan: ExecutionPlan, partitions: int) ->
RecordBatchStream:
"""Execute the ``plan`` and return the results."""
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index c1b649e3..5a21d773 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -60,6 +60,8 @@ if TYPE_CHECKING:
import polars as pl
import pyarrow as pa
+ from datafusion.catalog import Table
+
from enum import Enum
@@ -313,9 +315,21 @@ class DataFrame:
"""
self.df = df
- def into_view(self) -> pa.Table:
- """Convert DataFrame as a ViewTable which can be used in
register_table."""
- return self.df.into_view()
+ def into_view(self) -> Table:
+ """Convert ``DataFrame`` into a :class:`~datafusion.Table`.
+
+ Examples:
+ >>> from datafusion import SessionContext
+ >>> ctx = SessionContext()
+ >>> df = ctx.sql("SELECT 1 AS value")
+ >>> view = df.into_view()
+ >>> ctx.register_table("values_view", view)
+ >>> df.collect() # The DataFrame is still usable
+ >>> ctx.sql("SELECT value FROM values_view").collect()
+ """
+ from datafusion.catalog import Table as _Table
+
+ return _Table(self.df.into_view())
def __getitem__(self, key: str | list[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or
columns.
diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py
index 5d1180bd..82e30a78 100644
--- a/python/datafusion/expr.py
+++ b/python/datafusion/expr.py
@@ -25,14 +25,12 @@ from __future__ import annotations
import typing as _typing
from typing import TYPE_CHECKING, Any, ClassVar, Iterable, Optional, Sequence
-import pyarrow as pa
-
try:
from warnings import deprecated # Python 3.13+
except ImportError:
from typing_extensions import deprecated # Python 3.12
-from datafusion.common import NullTreatment
+import pyarrow as pa
from ._internal import expr as expr_internal
from ._internal import functions as functions_internal
@@ -40,8 +38,11 @@ from ._internal import functions as functions_internal
if TYPE_CHECKING:
from collections.abc import Sequence
- # Type-only imports
- from datafusion.common import DataTypeMap, RexType
+ from datafusion.common import ( # type: ignore[import]
+ DataTypeMap,
+ NullTreatment,
+ RexType,
+ )
from datafusion.plan import LogicalPlan
diff --git a/python/datafusion/io.py b/python/datafusion/io.py
index 551e20a6..67dbc730 100644
--- a/python/datafusion/io.py
+++ b/python/datafusion/io.py
@@ -22,13 +22,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from datafusion.context import SessionContext
-from datafusion.dataframe import DataFrame
if TYPE_CHECKING:
import pathlib
import pyarrow as pa
+ from datafusion.dataframe import DataFrame
from datafusion.expr import Expr
diff --git a/python/tests/test_catalog.py b/python/tests/test_catalog.py
index 1f9ecbfc..f0c492ce 100644
--- a/python/tests/test_catalog.py
+++ b/python/tests/test_catalog.py
@@ -53,7 +53,7 @@ def create_dataset() -> Table:
names=["a", "b"],
)
dataset = ds.dataset([batch])
- return Table.from_dataset(dataset)
+ return Table(dataset)
class CustomSchemaProvider(dfn.catalog.SchemaProvider):
@@ -164,6 +164,28 @@ def test_python_table_provider(ctx: SessionContext):
assert schema.table_names() == {"table4"}
+def test_schema_register_table_with_pyarrow_dataset(ctx: SessionContext):
+ schema = ctx.catalog().schema()
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+ names=["a", "b"],
+ )
+ dataset = ds.dataset([batch])
+ table_name = "pa_dataset"
+
+ try:
+ schema.register_table(table_name, dataset)
+ assert table_name in schema.table_names()
+
+ result = ctx.sql(f"SELECT a, b FROM {table_name}").collect()
+
+ assert len(result) == 1
+ assert result[0].column(0) == pa.array([1, 2, 3])
+ assert result[0].column(1) == pa.array([4, 5, 6])
+ finally:
+ schema.deregister_table(table_name)
+
+
def test_in_end_to_end_python_providers(ctx: SessionContext):
"""Test registering all python providers and running a query against
them."""
diff --git a/python/tests/test_context.py b/python/tests/test_context.py
index 6dbcc0d5..50076c9b 100644
--- a/python/tests/test_context.py
+++ b/python/tests/test_context.py
@@ -27,6 +27,7 @@ from datafusion import (
SessionConfig,
SessionContext,
SQLOptions,
+ Table,
column,
literal,
)
@@ -330,6 +331,40 @@ def test_deregister_table(ctx, database):
assert public.names() == {"csv1", "csv2"}
+def test_register_table_from_dataframe(ctx):
+ df = ctx.from_pydict({"a": [1, 2]})
+ ctx.register_table("df_tbl", df)
+ result = ctx.sql("SELECT * FROM df_tbl").collect()
+ assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
+
+
+def test_register_table_from_dataframe_into_view(ctx):
+ df = ctx.from_pydict({"a": [1, 2]})
+ table = df.into_view()
+ assert isinstance(table, Table)
+ ctx.register_table("view_tbl", table)
+ result = ctx.sql("SELECT * FROM view_tbl").collect()
+ assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
+
+
+def test_table_from_dataframe(ctx):
+ df = ctx.from_pydict({"a": [1, 2]})
+ table = Table(df)
+ assert isinstance(table, Table)
+ ctx.register_table("from_dataframe_tbl", table)
+ result = ctx.sql("SELECT * FROM from_dataframe_tbl").collect()
+ assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
+
+
+def test_table_from_dataframe_internal(ctx):
+ df = ctx.from_pydict({"a": [1, 2]})
+ table = Table(df.df)
+ assert isinstance(table, Table)
+ ctx.register_table("from_internal_dataframe_tbl", table)
+ result = ctx.sql("SELECT * FROM from_internal_dataframe_tbl").collect()
+ assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
+
+
def test_register_dataset(ctx):
# create a RecordBatch and register it as a pyarrow.dataset.Dataset
batch = pa.RecordBatch.from_arrays(
diff --git a/python/tests/test_wrapper_coverage.py
b/python/tests/test_wrapper_coverage.py
index f484cb28..cf6719ec 100644
--- a/python/tests/test_wrapper_coverage.py
+++ b/python/tests/test_wrapper_coverage.py
@@ -28,7 +28,27 @@ except ImportError:
from enum import EnumMeta as EnumType
-def missing_exports(internal_obj, wrapped_obj) -> None: # noqa: C901
+def _check_enum_exports(internal_obj, wrapped_obj) -> None:
+ """Check that all enum values are present in wrapped object."""
+ expected_values = [v for v in dir(internal_obj) if not v.startswith("__")]
+ for value in expected_values:
+ assert value in dir(wrapped_obj)
+
+
+def _check_list_attribute(internal_attr, wrapped_attr) -> None:
+ """Check that list attributes match between internal and wrapped
objects."""
+ assert isinstance(wrapped_attr, list)
+
+ # We have cases like __all__ that are a list and we want to be certain that
+ # every value in the list in the internal object is also in the wrapper
list
+ for val in internal_attr:
+ if isinstance(val, str) and val.startswith("Raw"):
+ assert val[3:] in wrapped_attr
+ else:
+ assert val in wrapped_attr
+
+
+def missing_exports(internal_obj, wrapped_obj) -> None:
"""
Identify if any of the rust exposted structs or functions do not have
wrappers.
@@ -40,9 +60,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None: #
noqa: C901
# Special case enums - EnumType overrides a some of the internal functions,
# so check all of the values exist and move on
if isinstance(wrapped_obj, EnumType):
- expected_values = [v for v in dir(internal_obj) if not
v.startswith("__")]
- for value in expected_values:
- assert value in dir(wrapped_obj)
+ _check_enum_exports(internal_obj, wrapped_obj)
return
if "__repr__" in internal_obj.__dict__ and "__repr__" not in
wrapped_obj.__dict__:
@@ -50,6 +68,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None: #
noqa: C901
for internal_attr_name in dir(internal_obj):
wrapped_attr_name = internal_attr_name.removeprefix("Raw")
+
assert wrapped_attr_name in dir(wrapped_obj)
internal_attr = getattr(internal_obj, internal_attr_name)
@@ -66,15 +85,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None: #
noqa: C901
continue
if isinstance(internal_attr, list):
- assert isinstance(wrapped_attr, list)
-
- # We have cases like __all__ that are a list and we want to be
certain that
- # every value in the list in the internal object is also in the
wrapper list
- for val in internal_attr:
- if isinstance(val, str) and val.startswith("Raw"):
- assert val[3:] in wrapped_attr
- else:
- assert val in wrapped_attr
+ _check_list_attribute(internal_attr, wrapped_attr)
elif hasattr(internal_attr, "__dict__"):
# Check all submodules recursively
missing_exports(internal_attr, wrapped_attr)
diff --git a/src/catalog.rs b/src/catalog.rs
index 17d4ec3b..02ebfb93 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -17,17 +17,16 @@
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError,
PyDataFusionResult};
+use crate::table::PyTable;
use crate::utils::{validate_pycapsule, wait_for_future};
use async_trait::async_trait;
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
use datafusion::common::DataFusionError;
use datafusion::{
- arrow::pyarrow::ToPyArrow,
catalog::{CatalogProvider, SchemaProvider},
- datasource::{TableProvider, TableType},
+ datasource::TableProvider,
};
use datafusion_ffi::schema_provider::{FFI_SchemaProvider,
ForeignSchemaProvider};
-use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
use pyo3::exceptions::PyKeyError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
@@ -48,12 +47,6 @@ pub struct PySchema {
pub schema: Arc<dyn SchemaProvider>,
}
-#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
-#[derive(Clone)]
-pub struct PyTable {
- pub table: Arc<dyn TableProvider>,
-}
-
impl From<Arc<dyn CatalogProvider>> for PyCatalog {
fn from(catalog: Arc<dyn CatalogProvider>) -> Self {
Self { catalog }
@@ -66,16 +59,6 @@ impl From<Arc<dyn SchemaProvider>> for PySchema {
}
}
-impl PyTable {
- pub fn new(table: Arc<dyn TableProvider>) -> Self {
- Self { table }
- }
-
- pub fn table(&self) -> Arc<dyn TableProvider> {
- self.table.clone()
- }
-}
-
#[pymethods]
impl PyCatalog {
#[new]
@@ -181,7 +164,7 @@ impl PySchema {
fn table(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
if let Some(table) = wait_for_future(py, self.schema.table(name))?? {
- Ok(PyTable::new(table))
+ Ok(PyTable::from(table))
} else {
Err(PyDataFusionError::Common(format!(
"Table not found: {name}"
@@ -195,31 +178,12 @@ impl PySchema {
Ok(format!("Schema(table_names=[{}])", names.join(";")))
}
- fn register_table(&self, name: &str, table_provider: Bound<'_, PyAny>) ->
PyResult<()> {
- let provider = if
table_provider.hasattr("__datafusion_table_provider__")? {
- let capsule = table_provider
- .getattr("__datafusion_table_provider__")?
- .call0()?;
- let capsule =
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
- validate_pycapsule(capsule, "datafusion_table_provider")?;
-
- let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
- let provider: ForeignTableProvider = provider.into();
- Arc::new(provider) as Arc<dyn TableProvider>
- } else {
- match table_provider.extract::<PyTable>() {
- Ok(py_table) => py_table.table,
- Err(_) => {
- let py = table_provider.py();
- let provider = Dataset::new(&table_provider, py)?;
- Arc::new(provider) as Arc<dyn TableProvider>
- }
- }
- };
+ fn register_table(&self, name: &str, table_provider: &Bound<'_, PyAny>) ->
PyResult<()> {
+ let table = PyTable::new(table_provider)?;
let _ = self
.schema
- .register_table(name.to_string(), provider)
+ .register_table(name.to_string(), table.table)
.map_err(py_datafusion_err)?;
Ok(())
@@ -235,43 +199,6 @@ impl PySchema {
}
}
-#[pymethods]
-impl PyTable {
- /// Get a reference to the schema for this table
- #[getter]
- fn schema(&self, py: Python) -> PyResult<PyObject> {
- self.table.schema().to_pyarrow(py)
- }
-
- #[staticmethod]
- fn from_dataset(py: Python<'_>, dataset: &Bound<'_, PyAny>) ->
PyResult<Self> {
- let ds = Arc::new(Dataset::new(dataset,
py).map_err(py_datafusion_err)?)
- as Arc<dyn TableProvider>;
-
- Ok(Self::new(ds))
- }
-
- /// Get the type of this table for metadata/catalog purposes.
- #[getter]
- fn kind(&self) -> &str {
- match self.table.table_type() {
- TableType::Base => "physical",
- TableType::View => "view",
- TableType::Temporary => "temporary",
- }
- }
-
- fn __repr__(&self) -> PyResult<String> {
- let kind = self.kind();
- Ok(format!("Table(kind={kind})"))
- }
-
- // fn scan
- // fn statistics
- // fn has_exact_statistics
- // fn supports_filter_pushdown
-}
-
#[derive(Debug)]
pub(crate) struct RustWrappedPySchemaProvider {
schema_provider: PyObject,
@@ -304,30 +231,9 @@ impl RustWrappedPySchemaProvider {
return Ok(None);
}
- if py_table.hasattr("__datafusion_table_provider__")? {
- let capsule =
provider.getattr("__datafusion_table_provider__")?.call0()?;
- let capsule =
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
- validate_pycapsule(capsule, "datafusion_table_provider")?;
-
- let provider = unsafe {
capsule.reference::<FFI_TableProvider>() };
- let provider: ForeignTableProvider = provider.into();
-
- Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
- } else {
- if let Ok(inner_table) = py_table.getattr("table") {
- if let Ok(inner_table) = inner_table.extract::<PyTable>() {
- return Ok(Some(inner_table.table));
- }
- }
+ let table = PyTable::new(&py_table)?;
- match py_table.extract::<PyTable>() {
- Ok(py_table) => Ok(Some(py_table.table)),
- Err(_) => {
- let ds = Dataset::new(&py_table,
py).map_err(py_datafusion_err)?;
- Ok(Some(Arc::new(ds) as Arc<dyn TableProvider>))
- }
- }
- }
+ Ok(Some(table.table))
})
}
}
@@ -368,7 +274,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
- let py_table = PyTable::new(table);
+ let py_table = PyTable::from(table);
Python::with_gil(|py| {
let provider = self.schema_provider.bind(py);
let _ = provider
diff --git a/src/context.rs b/src/context.rs
index 0ccb0326..23cf7e54 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -31,7 +31,7 @@ use uuid::Uuid;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
-use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider};
+use crate::catalog::{PyCatalog, RustWrappedPyCatalogProvider};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
@@ -41,6 +41,7 @@ use crate::record_batch::PyRecordBatchStream;
use crate::sql::exceptions::py_value_err;
use crate::sql::logical::PyLogicalPlan;
use crate::store::StorageContexts;
+use crate::table::PyTable;
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::udtf::PyTableFunction;
@@ -71,7 +72,6 @@ use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions,
ParquetReadOptions,
};
use datafusion_ffi::catalog_provider::{FFI_CatalogProvider,
ForeignCatalogProvider};
-use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
use pyo3::IntoPyObjectExt;
use tokio::task::JoinHandle;
@@ -417,12 +417,7 @@ impl PySessionContext {
.with_listing_options(options)
.with_schema(resolved_schema);
let table = ListingTable::try_new(config)?;
- self.register_table(
- name,
- &PyTable {
- table: Arc::new(table),
- },
- )?;
+ self.ctx.register_table(name, Arc::new(table))?;
Ok(())
}
@@ -599,8 +594,10 @@ impl PySessionContext {
Ok(df)
}
- pub fn register_table(&self, name: &str, table: &PyTable) ->
PyDataFusionResult<()> {
- self.ctx.register_table(name, table.table())?;
+ pub fn register_table(&self, name: &str, table: Bound<'_, PyAny>) ->
PyDataFusionResult<()> {
+ let table = PyTable::new(&table)?;
+
+ self.ctx.register_table(name, table.table)?;
Ok(())
}
@@ -643,23 +640,8 @@ impl PySessionContext {
name: &str,
provider: Bound<'_, PyAny>,
) -> PyDataFusionResult<()> {
- if provider.hasattr("__datafusion_table_provider__")? {
- let capsule =
provider.getattr("__datafusion_table_provider__")?.call0()?;
- let capsule =
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
- validate_pycapsule(capsule, "datafusion_table_provider")?;
-
- let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
- let provider: ForeignTableProvider = provider.into();
-
- let _ = self.ctx.register_table(name, Arc::new(provider))?;
-
- Ok(())
- } else {
- Err(crate::errors::PyDataFusionError::Common(
- "__datafusion_table_provider__ does not exist on Table
Provider object."
- .to_string(),
- ))
- }
+ // Deprecated: use `register_table` instead
+ self.register_table(name, provider)
}
pub fn register_record_batches(
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 5882acf7..6aba4947 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -31,12 +31,10 @@ use datafusion::arrow::util::pretty;
use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions,
TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
-use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel,
ZstdLevel};
use datafusion::prelude::*;
-use datafusion_ffi::table_provider::FFI_TableProvider;
use futures::{StreamExt, TryStreamExt};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
@@ -44,12 +42,12 @@ use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
use tokio::task::JoinHandle;
-use crate::catalog::PyTable;
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
+use crate::table::PyTable;
use crate::utils::{
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value,
validate_pycapsule, wait_for_future,
};
@@ -58,40 +56,6 @@ use crate::{
expr::{sort_expr::PySortExpr, PyExpr},
};
-// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
-// - we have not decided on the table_provider approach yet
-// this is an interim implementation
-#[pyclass(name = "TableProvider", module = "datafusion")]
-pub struct PyTableProvider {
- provider: Arc<dyn TableProvider + Send>,
-}
-
-impl PyTableProvider {
- pub fn new(provider: Arc<dyn TableProvider>) -> Self {
- Self { provider }
- }
-
- pub fn as_table(&self) -> PyTable {
- let table_provider: Arc<dyn TableProvider> = self.provider.clone();
- PyTable::new(table_provider)
- }
-}
-
-#[pymethods]
-impl PyTableProvider {
- fn __datafusion_table_provider__<'py>(
- &self,
- py: Python<'py>,
- ) -> PyResult<Bound<'py, PyCapsule>> {
- let name = CString::new("datafusion_table_provider").unwrap();
-
- let runtime = get_tokio_runtime().0.handle().clone();
- let provider = FFI_TableProvider::new(Arc::clone(&self.provider),
false, Some(runtime));
-
- PyCapsule::new(py, provider, Some(name.clone()))
- }
-}
-
/// Configuration for DataFrame display formatting
#[derive(Debug, Clone)]
pub struct FormatterConfig {
@@ -302,6 +266,11 @@ impl PyDataFrame {
}
}
+ /// Return a clone of the inner Arc<DataFrame> for crate-local callers.
+ pub(crate) fn inner_df(&self) -> Arc<DataFrame> {
+ Arc::clone(&self.df)
+ }
+
fn prepare_repr_string(&mut self, py: Python, as_html: bool) ->
PyDataFusionResult<String> {
// Get the Python formatter and config
let PythonFormatter { formatter, config } =
get_python_formatter_with_config(py)?;
@@ -427,22 +396,18 @@ impl PyDataFrame {
PyArrowType(self.df.schema().into())
}
- /// Convert this DataFrame into a Table that can be used in register_table
+ /// Convert this DataFrame into a Table Provider that can be used in
register_table
/// By convention, into_... methods consume self and return the new object.
/// Disabling the clippy lint, so we can use &self
/// because we're working with Python bindings
/// where objects are shared
- ///
https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
- /// - we have not decided on the table_provider approach yet
#[allow(clippy::wrong_self_convention)]
- fn into_view(&self) -> PyDataFusionResult<PyTable> {
+ pub fn into_view(&self) -> PyDataFusionResult<PyTable> {
// Call the underlying Rust DataFrame::into_view method.
// Note that the Rust method consumes self; here we clone the inner
Arc<DataFrame>
- // so that we don’t invalidate this PyDataFrame.
+ // so that we don't invalidate this PyDataFrame.
let table_provider = self.df.as_ref().clone().into_view();
- let table_provider = PyTableProvider::new(table_provider);
-
- Ok(table_provider.as_table())
+ Ok(PyTable::from(table_provider))
}
#[pyo3(signature = (*args))]
diff --git a/src/lib.rs b/src/lib.rs
index 29d3f41d..0361c731 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -52,6 +52,7 @@ pub mod pyarrow_util;
mod record_batch;
pub mod sql;
pub mod store;
+pub mod table;
pub mod unparser;
#[cfg(feature = "substrait")]
diff --git a/src/table.rs b/src/table.rs
new file mode 100644
index 00000000..812581ed
--- /dev/null
+++ b/src/table.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::pyarrow::ToPyArrow;
+use datafusion::datasource::{TableProvider, TableType};
+use pyo3::prelude::*;
+use std::sync::Arc;
+
+use crate::dataframe::PyDataFrame;
+use crate::dataset::Dataset;
+use crate::utils::table_provider_from_pycapsule;
+
+/// This struct is used as a common method for all TableProviders,
+/// whether they refer to an FFI provider, an internally known
+/// implementation, a dataset, or a dataframe view.
+#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
+#[derive(Clone)]
+pub struct PyTable {
+ pub table: Arc<dyn TableProvider>,
+}
+
+impl PyTable {
+ pub fn table(&self) -> Arc<dyn TableProvider> {
+ self.table.clone()
+ }
+}
+
+#[pymethods]
+impl PyTable {
+ /// Instantiate from any Python object that supports any of the table
+ /// types. We do not know a priori when using this method if the object
+ /// will be passed a wrapped or raw class. Here we handle all of the
+ /// following object types:
+ ///
+ /// - PyTable (essentially a clone operation), but either raw or wrapped
+ /// - DataFrame, either raw or wrapped
+ /// - FFI Table Providers via PyCapsule
+ /// - PyArrow Dataset objects
+ #[new]
+ pub fn new(obj: &Bound<'_, PyAny>) -> PyResult<Self> {
+ if let Ok(py_table) = obj.extract::<PyTable>() {
+ Ok(py_table)
+ } else if let Ok(py_table) = obj
+ .getattr("_inner")
+ .and_then(|inner| inner.extract::<PyTable>())
+ {
+ Ok(py_table)
+ } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
+ let provider = py_df.inner_df().as_ref().clone().into_view();
+ Ok(PyTable::from(provider))
+ } else if let Ok(py_df) = obj
+ .getattr("df")
+ .and_then(|inner| inner.extract::<PyDataFrame>())
+ {
+ let provider = py_df.inner_df().as_ref().clone().into_view();
+ Ok(PyTable::from(provider))
+ } else if let Some(provider) = table_provider_from_pycapsule(obj)? {
+ Ok(PyTable::from(provider))
+ } else {
+ let py = obj.py();
+ let provider = Arc::new(Dataset::new(obj, py)?) as Arc<dyn
TableProvider>;
+ Ok(PyTable::from(provider))
+ }
+ }
+
+ /// Get a reference to the schema for this table
+ #[getter]
+ fn schema(&self, py: Python) -> PyResult<PyObject> {
+ self.table.schema().to_pyarrow(py)
+ }
+
+ /// Get the type of this table for metadata/catalog purposes.
+ #[getter]
+ fn kind(&self) -> &str {
+ match self.table.table_type() {
+ TableType::Base => "physical",
+ TableType::View => "view",
+ TableType::Temporary => "temporary",
+ }
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ let kind = self.kind();
+ Ok(format!("Table(kind={kind})"))
+ }
+}
+
+impl From<Arc<dyn TableProvider>> for PyTable {
+ fn from(table: Arc<dyn TableProvider>) -> Self {
+ Self { table }
+ }
+}
diff --git a/src/udtf.rs b/src/udtf.rs
index db16d6c0..7a1b0aaf 100644
--- a/src/udtf.rs
+++ b/src/udtf.rs
@@ -18,14 +18,13 @@
use pyo3::prelude::*;
use std::sync::Arc;
-use crate::dataframe::PyTableProvider;
use crate::errors::{py_datafusion_err, to_datafusion_err};
use crate::expr::PyExpr;
-use crate::utils::validate_pycapsule;
+use crate::table::PyTable;
+use crate::utils::{table_provider_from_pycapsule, validate_pycapsule};
use datafusion::catalog::{TableFunctionImpl, TableProvider};
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_expr::Expr;
-use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
use datafusion_ffi::udtf::{FFI_TableFunction, ForeignTableFunction};
use pyo3::exceptions::PyNotImplementedError;
use pyo3::types::{PyCapsule, PyTuple};
@@ -71,11 +70,11 @@ impl PyTableFunction {
}
#[pyo3(signature = (*args))]
- pub fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyTableProvider> {
+ pub fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyTable> {
let args: Vec<Expr> = args.iter().map(|e| e.expr.clone()).collect();
let table_provider = self.call(&args).map_err(py_datafusion_err)?;
- Ok(PyTableProvider::new(table_provider))
+ Ok(PyTable::from(table_provider))
}
fn __repr__(&self) -> PyResult<String> {
@@ -99,20 +98,11 @@ fn call_python_table_function(
let provider_obj = func.call1(py, py_args)?;
let provider = provider_obj.bind(py);
- if provider.hasattr("__datafusion_table_provider__")? {
- let capsule =
provider.getattr("__datafusion_table_provider__")?.call0()?;
- let capsule =
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
- validate_pycapsule(capsule, "datafusion_table_provider")?;
-
- let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
- let provider: ForeignTableProvider = provider.into();
-
- Ok(Arc::new(provider) as Arc<dyn TableProvider>)
- } else {
- Err(PyNotImplementedError::new_err(
+ table_provider_from_pycapsule(provider)?.ok_or_else(|| {
+ PyNotImplementedError::new_err(
"__datafusion_table_provider__ does not exist on Table
Provider object.",
- ))
- }
+ )
+ })
})
.map_err(to_datafusion_err)
}
diff --git a/src/utils.rs b/src/utils.rs
index 3b30de5d..0fcfadce 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -15,18 +15,26 @@
// specific language governing permissions and limitations
// under the License.
+use crate::errors::py_datafusion_err;
use crate::{
common::data_type::PyScalarValue,
errors::{PyDataFusionError, PyDataFusionResult},
TokioRuntime,
};
use datafusion::{
- common::ScalarValue, execution::context::SessionContext,
logical_expr::Volatility,
+ common::ScalarValue, datasource::TableProvider,
execution::context::SessionContext,
+ logical_expr::Volatility,
};
+use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
use pyo3::prelude::*;
use pyo3::{exceptions::PyValueError, types::PyCapsule};
-use std::{future::Future, sync::OnceLock, time::Duration};
+use std::{
+ future::Future,
+ sync::{Arc, OnceLock},
+ time::Duration,
+};
use tokio::{runtime::Runtime, time::sleep};
+
/// Utility to get the Tokio Runtime from Python
#[inline]
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
@@ -91,7 +99,7 @@ pub(crate) fn parse_volatility(value: &str) ->
PyDataFusionResult<Volatility> {
"volatile" => Volatility::Volatile,
value => {
return Err(PyDataFusionError::Common(format!(
- "Unsupportad volatility type: `{value}`, supported \
+ "Unsupported volatility type: `{value}`, supported \
values are: immutable, stable and volatile."
)))
}
@@ -101,9 +109,9 @@ pub(crate) fn parse_volatility(value: &str) ->
PyDataFusionResult<Volatility> {
pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) ->
PyResult<()> {
let capsule_name = capsule.name()?;
if capsule_name.is_none() {
- return Err(PyValueError::new_err(
- "Expected schema PyCapsule to have name set.",
- ));
+ return Err(PyValueError::new_err(format!(
+ "Expected {name} PyCapsule to have name set."
+ )));
}
let capsule_name = capsule_name.unwrap().to_str()?;
@@ -116,6 +124,23 @@ pub(crate) fn validate_pycapsule(capsule:
&Bound<PyCapsule>, name: &str) -> PyRe
Ok(())
}
+pub(crate) fn table_provider_from_pycapsule(
+ obj: &Bound<PyAny>,
+) -> PyResult<Option<Arc<dyn TableProvider>>> {
+ if obj.hasattr("__datafusion_table_provider__")? {
+ let capsule = obj.getattr("__datafusion_table_provider__")?.call0()?;
+ let capsule =
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
+ validate_pycapsule(capsule, "datafusion_table_provider")?;
+
+ let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
+ let provider: ForeignTableProvider = provider.into();
+
+ Ok(Some(Arc::new(provider)))
+ } else {
+ Ok(None)
+ }
+}
+
pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) ->
PyResult<ScalarValue> {
// convert Python object to PyScalarValue to ScalarValue
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]