This is an automated email from the ASF dual-hosted git repository.

kosiew pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 709c918e REVERT CHANGES to commit
709c918e is described below

commit 709c918ef810d7207f12c09b82c2e1b1c4ad8290
Author: Siew Kam Onn <[email protected]>
AuthorDate: Thu Oct 2 11:46:21 2025 +0800

    REVERT CHANGES to commit
---
 docs/source/conf.py                                |   7 --
 docs/source/contributor-guide/ffi.rst              |   2 +-
 docs/source/user-guide/data-sources.rst            |   4 +-
 docs/source/user-guide/io/table_provider.rst       |  20 ++--
 .../python/tests/_test_catalog_provider.py         |   6 +-
 .../python/tests/_test_table_function.py           |   2 +-
 .../python/tests/_test_table_provider.py           |   2 +-
 python/datafusion/__init__.py                      |  12 ++-
 python/datafusion/catalog.py                       |  62 ++++--------
 python/datafusion/context.py                       |  42 +++-----
 python/datafusion/dataframe.py                     |  20 +---
 python/datafusion/expr.py                          |  11 +-
 python/datafusion/io.py                            |   2 +-
 python/tests/test_catalog.py                       |  24 +----
 python/tests/test_context.py                       |  35 -------
 python/tests/test_wrapper_coverage.py              |  37 +++----
 src/catalog.rs                                     | 112 +++++++++++++++++++--
 src/context.rs                                     |  36 +++++--
 src/dataframe.rs                                   |  55 ++++++++--
 src/lib.rs                                         |   1 -
 src/table.rs                                       | 106 -------------------
 src/udtf.rs                                        |  26 +++--
 src/utils.rs                                       |  37 ++-----
 23 files changed, 278 insertions(+), 383 deletions(-)

diff --git a/docs/source/conf.py b/docs/source/conf.py
index 01813b03..28db17d3 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -91,13 +91,6 @@ def autoapi_skip_member_fn(app, what, name, obj, skip, 
options) -> bool:  # noqa
         ("method", "datafusion.context.SessionContext.tables"),
         ("method", "datafusion.dataframe.DataFrame.unnest_column"),
     ]
-    # Explicitly skip certain members listed above. These are either
-    # re-exports, duplicate module-level documentation, deprecated
-    # API surfaces, or private variables that would otherwise appear
-    # in the generated docs and cause confusing duplication.
-    # Keeping this explicit list avoids surprising entries in the
-    # AutoAPI output and gives us a single place to opt-out items
-    # when we intentionally hide them from the docs.
     if (what, name) in skip_contents:
         skip = True
 
diff --git a/docs/source/contributor-guide/ffi.rst 
b/docs/source/contributor-guide/ffi.rst
index 72fba8e3..e201db71 100644
--- a/docs/source/contributor-guide/ffi.rst
+++ b/docs/source/contributor-guide/ffi.rst
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of 
DataFusion, you may dec
 your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a 
Python library.
 
 At first glance, it may appear the best way to do this is to add the 
``datafusion-python``
-crate as a dependency, provide a ``PyTable``, and then to register it with the
+crate as a dependency, provide a ``PyTable``, and then to register it with the 
 ``SessionContext``. Unfortunately, this will not work.
 
 When you produce your code as a Python library and it needs to interact with 
the DataFusion
diff --git a/docs/source/user-guide/data-sources.rst 
b/docs/source/user-guide/data-sources.rst
index 26f1303c..a9b119b9 100644
--- a/docs/source/user-guide/data-sources.rst
+++ b/docs/source/user-guide/data-sources.rst
@@ -154,11 +154,11 @@ as Delta Lake. This will require a recent version of
     from deltalake import DeltaTable
 
     delta_table = DeltaTable("path_to_table")
-    ctx.register_table("my_delta_table", delta_table)
+    ctx.register_table_provider("my_delta_table", delta_table)
     df = ctx.table("my_delta_table")
     df.show()
 
-On older versions of ``deltalake`` (prior to 0.22) you can use the
+On older versions of ``deltalake`` (prior to 0.22) you can use the 
 `Arrow DataSet 
<https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html>`_
 interface to import to DataFusion, but this does not support features such as 
filter push down
 which can lead to a significant performance difference.
diff --git a/docs/source/user-guide/io/table_provider.rst 
b/docs/source/user-guide/io/table_provider.rst
index 29e5d988..bd1d6b80 100644
--- a/docs/source/user-guide/io/table_provider.rst
+++ b/docs/source/user-guide/io/table_provider.rst
@@ -37,26 +37,22 @@ A complete example can be found in the `examples folder 
<https://github.com/apac
             &self,
             py: Python<'py>,
         ) -> PyResult<Bound<'py, PyCapsule>> {
-            let name = cr"datafusion_table_provider".into();
+            let name = CString::new("datafusion_table_provider").unwrap();
 
-            let provider = Arc::new(self.clone());
-            let provider = FFI_TableProvider::new(provider, false, None);
+            let provider = Arc::new(self.clone())
+                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
+            let provider = FFI_TableProvider::new(Arc::new(provider), false);
 
             PyCapsule::new_bound(py, provider, Some(name.clone()))
         }
     }
 
-Once you have this library available, you can construct a
-:py:class:`~datafusion.Table` in Python and register it with the
-``SessionContext``.
+Once you have this library available, in python you can register your table 
provider
+to the ``SessionContext``.
 
 .. code-block:: python
 
-    from datafusion import SessionContext, Table
-
-    ctx = SessionContext()
     provider = MyTableProvider()
+    ctx.register_table_provider("my_table", provider)
 
-    ctx.register_table("capsule_table", provider)
-
-    ctx.table("capsule_table").show()
+    ctx.table("my_table").show()
diff --git 
a/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py 
b/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
index 1bf1bf13..72aadf64 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_catalog_provider.py
@@ -36,9 +36,9 @@ def test_catalog_provider():
 
     my_catalog_schemas = my_catalog.names()
     assert expected_schema_name in my_catalog_schemas
-    my_schema = my_catalog.schema(expected_schema_name)
-    assert expected_table_name in my_schema.names()
-    my_table = my_schema.table(expected_table_name)
+    my_database = my_catalog.database(expected_schema_name)
+    assert expected_table_name in my_database.names()
+    my_table = my_database.table(expected_table_name)
     assert expected_table_columns == my_table.schema.names
 
     result = ctx.table(
diff --git 
a/examples/datafusion-ffi-example/python/tests/_test_table_function.py 
b/examples/datafusion-ffi-example/python/tests/_test_table_function.py
index 4b8b2145..f3c56a90 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_table_function.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_table_function.py
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
     table_udtf = udtf(table_func, "my_table_func")
 
     my_table = table_udtf()
-    ctx.register_table("t", my_table)
+    ctx.register_table_provider("t", my_table)
     result = ctx.table("t").collect()
 
     assert len(result) == 2
diff --git 
a/examples/datafusion-ffi-example/python/tests/_test_table_provider.py 
b/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
index 91a77e61..6b24da06 100644
--- a/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
+++ b/examples/datafusion-ffi-example/python/tests/_test_table_provider.py
@@ -25,7 +25,7 @@ from datafusion_ffi_example import MyTableProvider
 def test_table_loading():
     ctx = SessionContext()
     table = MyTableProvider(3, 2, 4)
-    ctx.register_table("t", table)
+    ctx.register_table_provider("t", table)
     result = ctx.table("t").collect()
 
     assert len(result) == 4
diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py
index 9ebd58ea..e9d2dba7 100644
--- a/python/datafusion/__init__.py
+++ b/python/datafusion/__init__.py
@@ -28,16 +28,17 @@ from typing import Any
 try:
     import importlib.metadata as importlib_metadata
 except ImportError:
-    import importlib_metadata  # type: ignore[import]
+    import importlib_metadata
 
-# Public submodules
 from . import functions, object_store, substrait, unparser
 
 # The following imports are okay to remain as opaque to the user.
 from ._internal import Config
 from .catalog import Catalog, Database, Table
 from .col import col, column
-from .common import DFSchema
+from .common import (
+    DFSchema,
+)
 from .context import (
     RuntimeEnvBuilder,
     SessionConfig,
@@ -46,7 +47,10 @@ from .context import (
 )
 from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
 from .dataframe_formatter import configure_formatter
-from .expr import Expr, WindowFrame
+from .expr import (
+    Expr,
+    WindowFrame,
+)
 from .io import read_avro, read_csv, read_json, read_parquet
 from .plan import ExecutionPlan, LogicalPlan
 from .record_batch import RecordBatch, RecordBatchStream
diff --git a/python/datafusion/catalog.py b/python/datafusion/catalog.py
index da54d233..536b3a79 100644
--- a/python/datafusion/catalog.py
+++ b/python/datafusion/catalog.py
@@ -20,16 +20,13 @@
 from __future__ import annotations
 
 from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any, Protocol
+from typing import TYPE_CHECKING, Protocol
 
 import datafusion._internal as df_internal
 
 if TYPE_CHECKING:
     import pyarrow as pa
 
-    from datafusion import DataFrame
-    from datafusion.context import TableProviderExportable
-
 try:
     from warnings import deprecated  # Python 3.13+
 except ImportError:
@@ -85,11 +82,7 @@ class Catalog:
         """Returns the database with the given ``name`` from this catalog."""
         return self.schema(name)
 
-    def register_schema(
-        self,
-        name: str,
-        schema: Schema | SchemaProvider | SchemaProviderExportable,
-    ) -> Schema | None:
+    def register_schema(self, name, schema) -> Schema | None:
         """Register a schema with this catalog."""
         if isinstance(schema, Schema):
             return self.catalog.register_schema(name, schema._raw_schema)
@@ -129,12 +122,10 @@ class Schema:
         """Return the table with the given ``name`` from this schema."""
         return Table(self._raw_schema.table(name))
 
-    def register_table(
-        self,
-        name: str,
-        table: Table | TableProviderExportable | DataFrame | 
pa.dataset.Dataset,
-    ) -> None:
-        """Register a table in this schema."""
+    def register_table(self, name, table) -> None:
+        """Register a table provider in this schema."""
+        if isinstance(table, Table):
+            return self._raw_schema.register_table(name, table.table)
         return self._raw_schema.register_table(name, table)
 
     def deregister_table(self, name: str) -> None:
@@ -148,45 +139,30 @@ class Database(Schema):
 
 
 class Table:
-    """A DataFusion table.
-
-    Internally we currently support the following types of tables:
-
-    - Tables created using built-in DataFusion methods, such as
-      reading from CSV or Parquet
-    - pyarrow datasets
-    - DataFusion DataFrames, which will be converted into a view
-    - Externally provided tables implemented with the FFI PyCapsule
-      interface (advanced)
-    """
+    """DataFusion table."""
 
-    __slots__ = ("_inner",)
-
-    def __init__(
-        self, table: Table | TableProviderExportable | DataFrame | 
pa.dataset.Dataset
-    ) -> None:
-        """Constructor."""
-        self._inner = df_internal.catalog.RawTable(table)
+    def __init__(self, table: df_internal.catalog.RawTable) -> None:
+        """This constructor is not typically called by the end user."""
+        self.table = table
 
     def __repr__(self) -> str:
         """Print a string representation of the table."""
-        return repr(self._inner)
+        return self.table.__repr__()
 
     @staticmethod
-    @deprecated("Use Table() constructor instead.")
     def from_dataset(dataset: pa.dataset.Dataset) -> Table:
-        """Turn a :mod:`pyarrow.dataset` ``Dataset`` into a :class:`Table`."""
-        return Table(dataset)
+        """Turn a pyarrow Dataset into a Table."""
+        return Table(df_internal.catalog.RawTable.from_dataset(dataset))
 
     @property
     def schema(self) -> pa.Schema:
         """Returns the schema associated with this table."""
-        return self._inner.schema
+        return self.table.schema
 
     @property
     def kind(self) -> str:
         """Returns the kind of table."""
-        return self._inner.kind
+        return self.table.kind
 
 
 class CatalogProvider(ABC):
@@ -243,16 +219,14 @@ class SchemaProvider(ABC):
         """Retrieve a specific table from this schema."""
         ...
 
-    def register_table(  # noqa: B027
-        self, name: str, table: Table | TableProviderExportable | Any
-    ) -> None:
-        """Add a table to this schema.
+    def register_table(self, name: str, table: Table) -> None:  # noqa: B027
+        """Add a table from this schema.
 
         This method is optional. If your schema provides a fixed list of 
tables, you do
         not need to implement this method.
         """
 
-    def deregister_table(self, name: str, cascade: bool) -> None:  # noqa: B027
+    def deregister_table(self, name, cascade: bool) -> None:  # noqa: B027
         """Remove a table from this schema.
 
         This method is optional. If your schema provides a fixed list of 
tables, you do
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 86b9d90e..b6e728b5 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -29,10 +29,11 @@ except ImportError:
 
 import pyarrow as pa
 
-from datafusion.catalog import Catalog
+from datafusion.catalog import Catalog, CatalogProvider, Table
 from datafusion.dataframe import DataFrame
-from datafusion.expr import sort_list_to_raw_sort_list
+from datafusion.expr import SortKey, sort_list_to_raw_sort_list
 from datafusion.record_batch import RecordBatchStream
+from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, 
WindowUDF
 
 from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
 from ._internal import SessionConfig as SessionConfigInternal
@@ -47,15 +48,7 @@ if TYPE_CHECKING:
     import pandas as pd
     import polars as pl  # type: ignore[import]
 
-    from datafusion.catalog import CatalogProvider, Table
-    from datafusion.expr import SortKey
     from datafusion.plan import ExecutionPlan, LogicalPlan
-    from datafusion.user_defined import (
-        AggregateUDF,
-        ScalarUDF,
-        TableFunction,
-        WindowUDF,
-    )
 
 
 class ArrowStreamExportable(Protocol):
@@ -740,7 +733,7 @@ class SessionContext:
     # 
https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
     # is the discussion on how we arrived at adding register_view
     def register_view(self, name: str, df: DataFrame) -> None:
-        """Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
+        """Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
 
         Args:
             name (str): The name to register the view under.
@@ -749,21 +742,16 @@ class SessionContext:
         view = df.into_view()
         self.ctx.register_table(name, view)
 
-    def register_table(
-        self,
-        name: str,
-        table: Table | TableProviderExportable | DataFrame | 
pa.dataset.Dataset,
-    ) -> None:
-        """Register a :py:class:`~datafusion.Table` with this context.
+    def register_table(self, name: str, table: Table) -> None:
+        """Register a :py:class: `~datafusion.catalog.Table` as a table.
 
-        The registered table can be referenced from SQL statements executed 
against
-        this context.
+        The registered table can be referenced from SQL statement executed 
against.
 
         Args:
             name: Name of the resultant table.
-            table: Any object that can be converted into a :class:`Table`.
+            table: DataFusion table to add to the session context.
         """
-        self.ctx.register_table(name, table)
+        self.ctx.register_table(name, table.table)
 
     def deregister_table(self, name: str) -> None:
         """Remove a table from the session."""
@@ -782,17 +770,15 @@ class SessionContext:
         else:
             self.ctx.register_catalog_provider(name, provider)
 
-    @deprecated("Use register_table() instead.")
     def register_table_provider(
-        self,
-        name: str,
-        provider: Table | TableProviderExportable | DataFrame | 
pa.dataset.Dataset,
+        self, name: str, provider: TableProviderExportable
     ) -> None:
         """Register a table provider.
 
-        Deprecated: use :meth:`register_table` instead.
+        This table provider must have a method called 
``__datafusion_table_provider__``
+        which returns a PyCapsule that exposes a ``FFI_TableProvider``.
         """
-        self.register_table(name, provider)
+        self.ctx.register_table_provider(name, provider)
 
     def register_udtf(self, func: TableFunction) -> None:
         """Register a user defined table function."""
@@ -1184,7 +1170,7 @@ class SessionContext:
         :py:class:`~datafusion.catalog.ListingTable`, create a
         :py:class:`~datafusion.dataframe.DataFrame`.
         """
-        return DataFrame(self.ctx.read_table(table._inner))
+        return DataFrame(self.ctx.read_table(table.table))
 
     def execute(self, plan: ExecutionPlan, partitions: int) -> 
RecordBatchStream:
         """Execute the ``plan`` and return the results."""
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index 5a21d773..c1b649e3 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -60,8 +60,6 @@ if TYPE_CHECKING:
     import polars as pl
     import pyarrow as pa
 
-    from datafusion.catalog import Table
-
 from enum import Enum
 
 
@@ -315,21 +313,9 @@ class DataFrame:
         """
         self.df = df
 
-    def into_view(self) -> Table:
-        """Convert ``DataFrame`` into a :class:`~datafusion.Table`.
-
-        Examples:
-            >>> from datafusion import SessionContext
-            >>> ctx = SessionContext()
-            >>> df = ctx.sql("SELECT 1 AS value")
-            >>> view = df.into_view()
-            >>> ctx.register_table("values_view", view)
-            >>> df.collect()  # The DataFrame is still usable
-            >>> ctx.sql("SELECT value FROM values_view").collect()
-        """
-        from datafusion.catalog import Table as _Table
-
-        return _Table(self.df.into_view())
+    def into_view(self) -> pa.Table:
+        """Convert DataFrame as a ViewTable which can be used in 
register_table."""
+        return self.df.into_view()
 
     def __getitem__(self, key: str | list[str]) -> DataFrame:
         """Return a new :py:class`DataFrame` with the specified column or 
columns.
diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py
index 82e30a78..5d1180bd 100644
--- a/python/datafusion/expr.py
+++ b/python/datafusion/expr.py
@@ -25,12 +25,14 @@ from __future__ import annotations
 import typing as _typing
 from typing import TYPE_CHECKING, Any, ClassVar, Iterable, Optional, Sequence
 
+import pyarrow as pa
+
 try:
     from warnings import deprecated  # Python 3.13+
 except ImportError:
     from typing_extensions import deprecated  # Python 3.12
 
-import pyarrow as pa
+from datafusion.common import NullTreatment
 
 from ._internal import expr as expr_internal
 from ._internal import functions as functions_internal
@@ -38,11 +40,8 @@ from ._internal import functions as functions_internal
 if TYPE_CHECKING:
     from collections.abc import Sequence
 
-    from datafusion.common import (  # type: ignore[import]
-        DataTypeMap,
-        NullTreatment,
-        RexType,
-    )
+    # Type-only imports
+    from datafusion.common import DataTypeMap, RexType
     from datafusion.plan import LogicalPlan
 
 
diff --git a/python/datafusion/io.py b/python/datafusion/io.py
index 67dbc730..551e20a6 100644
--- a/python/datafusion/io.py
+++ b/python/datafusion/io.py
@@ -22,13 +22,13 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from datafusion.context import SessionContext
+from datafusion.dataframe import DataFrame
 
 if TYPE_CHECKING:
     import pathlib
 
     import pyarrow as pa
 
-    from datafusion.dataframe import DataFrame
     from datafusion.expr import Expr
 
 
diff --git a/python/tests/test_catalog.py b/python/tests/test_catalog.py
index f0c492ce..1f9ecbfc 100644
--- a/python/tests/test_catalog.py
+++ b/python/tests/test_catalog.py
@@ -53,7 +53,7 @@ def create_dataset() -> Table:
         names=["a", "b"],
     )
     dataset = ds.dataset([batch])
-    return Table(dataset)
+    return Table.from_dataset(dataset)
 
 
 class CustomSchemaProvider(dfn.catalog.SchemaProvider):
@@ -164,28 +164,6 @@ def test_python_table_provider(ctx: SessionContext):
     assert schema.table_names() == {"table4"}
 
 
-def test_schema_register_table_with_pyarrow_dataset(ctx: SessionContext):
-    schema = ctx.catalog().schema()
-    batch = pa.RecordBatch.from_arrays(
-        [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
-        names=["a", "b"],
-    )
-    dataset = ds.dataset([batch])
-    table_name = "pa_dataset"
-
-    try:
-        schema.register_table(table_name, dataset)
-        assert table_name in schema.table_names()
-
-        result = ctx.sql(f"SELECT a, b FROM {table_name}").collect()
-
-        assert len(result) == 1
-        assert result[0].column(0) == pa.array([1, 2, 3])
-        assert result[0].column(1) == pa.array([4, 5, 6])
-    finally:
-        schema.deregister_table(table_name)
-
-
 def test_in_end_to_end_python_providers(ctx: SessionContext):
     """Test registering all python providers and running a query against 
them."""
 
diff --git a/python/tests/test_context.py b/python/tests/test_context.py
index 50076c9b..6dbcc0d5 100644
--- a/python/tests/test_context.py
+++ b/python/tests/test_context.py
@@ -27,7 +27,6 @@ from datafusion import (
     SessionConfig,
     SessionContext,
     SQLOptions,
-    Table,
     column,
     literal,
 )
@@ -331,40 +330,6 @@ def test_deregister_table(ctx, database):
     assert public.names() == {"csv1", "csv2"}
 
 
-def test_register_table_from_dataframe(ctx):
-    df = ctx.from_pydict({"a": [1, 2]})
-    ctx.register_table("df_tbl", df)
-    result = ctx.sql("SELECT * FROM df_tbl").collect()
-    assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
-
-
-def test_register_table_from_dataframe_into_view(ctx):
-    df = ctx.from_pydict({"a": [1, 2]})
-    table = df.into_view()
-    assert isinstance(table, Table)
-    ctx.register_table("view_tbl", table)
-    result = ctx.sql("SELECT * FROM view_tbl").collect()
-    assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
-
-
-def test_table_from_dataframe(ctx):
-    df = ctx.from_pydict({"a": [1, 2]})
-    table = Table(df)
-    assert isinstance(table, Table)
-    ctx.register_table("from_dataframe_tbl", table)
-    result = ctx.sql("SELECT * FROM from_dataframe_tbl").collect()
-    assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
-
-
-def test_table_from_dataframe_internal(ctx):
-    df = ctx.from_pydict({"a": [1, 2]})
-    table = Table(df.df)
-    assert isinstance(table, Table)
-    ctx.register_table("from_internal_dataframe_tbl", table)
-    result = ctx.sql("SELECT * FROM from_internal_dataframe_tbl").collect()
-    assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
-
-
 def test_register_dataset(ctx):
     # create a RecordBatch and register it as a pyarrow.dataset.Dataset
     batch = pa.RecordBatch.from_arrays(
diff --git a/python/tests/test_wrapper_coverage.py 
b/python/tests/test_wrapper_coverage.py
index cf6719ec..f484cb28 100644
--- a/python/tests/test_wrapper_coverage.py
+++ b/python/tests/test_wrapper_coverage.py
@@ -28,27 +28,7 @@ except ImportError:
     from enum import EnumMeta as EnumType
 
 
-def _check_enum_exports(internal_obj, wrapped_obj) -> None:
-    """Check that all enum values are present in wrapped object."""
-    expected_values = [v for v in dir(internal_obj) if not v.startswith("__")]
-    for value in expected_values:
-        assert value in dir(wrapped_obj)
-
-
-def _check_list_attribute(internal_attr, wrapped_attr) -> None:
-    """Check that list attributes match between internal and wrapped 
objects."""
-    assert isinstance(wrapped_attr, list)
-
-    # We have cases like __all__ that are a list and we want to be certain that
-    # every value in the list in the internal object is also in the wrapper 
list
-    for val in internal_attr:
-        if isinstance(val, str) and val.startswith("Raw"):
-            assert val[3:] in wrapped_attr
-        else:
-            assert val in wrapped_attr
-
-
-def missing_exports(internal_obj, wrapped_obj) -> None:
+def missing_exports(internal_obj, wrapped_obj) -> None:  # noqa: C901
     """
     Identify if any of the rust exposted structs or functions do not have 
wrappers.
 
@@ -60,7 +40,9 @@ def missing_exports(internal_obj, wrapped_obj) -> None:
     # Special case enums - EnumType overrides a some of the internal functions,
     # so check all of the values exist and move on
     if isinstance(wrapped_obj, EnumType):
-        _check_enum_exports(internal_obj, wrapped_obj)
+        expected_values = [v for v in dir(internal_obj) if not 
v.startswith("__")]
+        for value in expected_values:
+            assert value in dir(wrapped_obj)
         return
 
     if "__repr__" in internal_obj.__dict__ and "__repr__" not in 
wrapped_obj.__dict__:
@@ -68,7 +50,6 @@ def missing_exports(internal_obj, wrapped_obj) -> None:
 
     for internal_attr_name in dir(internal_obj):
         wrapped_attr_name = internal_attr_name.removeprefix("Raw")
-
         assert wrapped_attr_name in dir(wrapped_obj)
 
         internal_attr = getattr(internal_obj, internal_attr_name)
@@ -85,7 +66,15 @@ def missing_exports(internal_obj, wrapped_obj) -> None:
             continue
 
         if isinstance(internal_attr, list):
-            _check_list_attribute(internal_attr, wrapped_attr)
+            assert isinstance(wrapped_attr, list)
+
+            # We have cases like __all__ that are a list and we want to be 
certain that
+            # every value in the list in the internal object is also in the 
wrapper list
+            for val in internal_attr:
+                if isinstance(val, str) and val.startswith("Raw"):
+                    assert val[3:] in wrapped_attr
+                else:
+                    assert val in wrapped_attr
         elif hasattr(internal_attr, "__dict__"):
             # Check all submodules recursively
             missing_exports(internal_attr, wrapped_attr)
diff --git a/src/catalog.rs b/src/catalog.rs
index 02ebfb93..17d4ec3b 100644
--- a/src/catalog.rs
+++ b/src/catalog.rs
@@ -17,16 +17,17 @@
 
 use crate::dataset::Dataset;
 use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, 
PyDataFusionResult};
-use crate::table::PyTable;
 use crate::utils::{validate_pycapsule, wait_for_future};
 use async_trait::async_trait;
 use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
 use datafusion::common::DataFusionError;
 use datafusion::{
+    arrow::pyarrow::ToPyArrow,
     catalog::{CatalogProvider, SchemaProvider},
-    datasource::TableProvider,
+    datasource::{TableProvider, TableType},
 };
 use datafusion_ffi::schema_provider::{FFI_SchemaProvider, 
ForeignSchemaProvider};
+use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
 use pyo3::exceptions::PyKeyError;
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
@@ -47,6 +48,12 @@ pub struct PySchema {
     pub schema: Arc<dyn SchemaProvider>,
 }
 
+#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
+#[derive(Clone)]
+pub struct PyTable {
+    pub table: Arc<dyn TableProvider>,
+}
+
 impl From<Arc<dyn CatalogProvider>> for PyCatalog {
     fn from(catalog: Arc<dyn CatalogProvider>) -> Self {
         Self { catalog }
@@ -59,6 +66,16 @@ impl From<Arc<dyn SchemaProvider>> for PySchema {
     }
 }
 
+impl PyTable {
+    pub fn new(table: Arc<dyn TableProvider>) -> Self {
+        Self { table }
+    }
+
+    pub fn table(&self) -> Arc<dyn TableProvider> {
+        self.table.clone()
+    }
+}
+
 #[pymethods]
 impl PyCatalog {
     #[new]
@@ -164,7 +181,7 @@ impl PySchema {
 
     fn table(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
         if let Some(table) = wait_for_future(py, self.schema.table(name))?? {
-            Ok(PyTable::from(table))
+            Ok(PyTable::new(table))
         } else {
             Err(PyDataFusionError::Common(format!(
                 "Table not found: {name}"
@@ -178,12 +195,31 @@ impl PySchema {
         Ok(format!("Schema(table_names=[{}])", names.join(";")))
     }
 
-    fn register_table(&self, name: &str, table_provider: &Bound<'_, PyAny>) -> 
PyResult<()> {
-        let table = PyTable::new(table_provider)?;
+    fn register_table(&self, name: &str, table_provider: Bound<'_, PyAny>) -> 
PyResult<()> {
+        let provider = if 
table_provider.hasattr("__datafusion_table_provider__")? {
+            let capsule = table_provider
+                .getattr("__datafusion_table_provider__")?
+                .call0()?;
+            let capsule = 
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
+            validate_pycapsule(capsule, "datafusion_table_provider")?;
+
+            let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
+            let provider: ForeignTableProvider = provider.into();
+            Arc::new(provider) as Arc<dyn TableProvider>
+        } else {
+            match table_provider.extract::<PyTable>() {
+                Ok(py_table) => py_table.table,
+                Err(_) => {
+                    let py = table_provider.py();
+                    let provider = Dataset::new(&table_provider, py)?;
+                    Arc::new(provider) as Arc<dyn TableProvider>
+                }
+            }
+        };
 
         let _ = self
             .schema
-            .register_table(name.to_string(), table.table)
+            .register_table(name.to_string(), provider)
             .map_err(py_datafusion_err)?;
 
         Ok(())
@@ -199,6 +235,43 @@ impl PySchema {
     }
 }
 
+#[pymethods]
+impl PyTable {
+    /// Get a reference to the schema for this table
+    #[getter]
+    fn schema(&self, py: Python) -> PyResult<PyObject> {
+        self.table.schema().to_pyarrow(py)
+    }
+
+    #[staticmethod]
+    fn from_dataset(py: Python<'_>, dataset: &Bound<'_, PyAny>) -> 
PyResult<Self> {
+        let ds = Arc::new(Dataset::new(dataset, 
py).map_err(py_datafusion_err)?)
+            as Arc<dyn TableProvider>;
+
+        Ok(Self::new(ds))
+    }
+
+    /// Get the type of this table for metadata/catalog purposes.
+    #[getter]
+    fn kind(&self) -> &str {
+        match self.table.table_type() {
+            TableType::Base => "physical",
+            TableType::View => "view",
+            TableType::Temporary => "temporary",
+        }
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        let kind = self.kind();
+        Ok(format!("Table(kind={kind})"))
+    }
+
+    // fn scan
+    // fn statistics
+    // fn has_exact_statistics
+    // fn supports_filter_pushdown
+}
+
 #[derive(Debug)]
 pub(crate) struct RustWrappedPySchemaProvider {
     schema_provider: PyObject,
@@ -231,9 +304,30 @@ impl RustWrappedPySchemaProvider {
                 return Ok(None);
             }
 
-            let table = PyTable::new(&py_table)?;
+            if py_table.hasattr("__datafusion_table_provider__")? {
+                let capsule = 
provider.getattr("__datafusion_table_provider__")?.call0()?;
+                let capsule = 
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
+                validate_pycapsule(capsule, "datafusion_table_provider")?;
+
+                let provider = unsafe { 
capsule.reference::<FFI_TableProvider>() };
+                let provider: ForeignTableProvider = provider.into();
+
+                Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
+            } else {
+                if let Ok(inner_table) = py_table.getattr("table") {
+                    if let Ok(inner_table) = inner_table.extract::<PyTable>() {
+                        return Ok(Some(inner_table.table));
+                    }
+                }
 
-            Ok(Some(table.table))
+                match py_table.extract::<PyTable>() {
+                    Ok(py_table) => Ok(Some(py_table.table)),
+                    Err(_) => {
+                        let ds = Dataset::new(&py_table, 
py).map_err(py_datafusion_err)?;
+                        Ok(Some(Arc::new(ds) as Arc<dyn TableProvider>))
+                    }
+                }
+            }
         })
     }
 }
@@ -274,7 +368,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
         name: String,
         table: Arc<dyn TableProvider>,
     ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
-        let py_table = PyTable::from(table);
+        let py_table = PyTable::new(table);
         Python::with_gil(|py| {
             let provider = self.schema_provider.bind(py);
             let _ = provider
diff --git a/src/context.rs b/src/context.rs
index 23cf7e54..0ccb0326 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -31,7 +31,7 @@ use uuid::Uuid;
 use pyo3::exceptions::{PyKeyError, PyValueError};
 use pyo3::prelude::*;
 
-use crate::catalog::{PyCatalog, RustWrappedPyCatalogProvider};
+use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider};
 use crate::dataframe::PyDataFrame;
 use crate::dataset::Dataset;
 use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
@@ -41,7 +41,6 @@ use crate::record_batch::PyRecordBatchStream;
 use crate::sql::exceptions::py_value_err;
 use crate::sql::logical::PyLogicalPlan;
 use crate::store::StorageContexts;
-use crate::table::PyTable;
 use crate::udaf::PyAggregateUDF;
 use crate::udf::PyScalarUDF;
 use crate::udtf::PyTableFunction;
@@ -72,6 +71,7 @@ use datafusion::prelude::{
     AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, 
ParquetReadOptions,
 };
 use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, 
ForeignCatalogProvider};
+use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
 use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
 use pyo3::IntoPyObjectExt;
 use tokio::task::JoinHandle;
@@ -417,7 +417,12 @@ impl PySessionContext {
             .with_listing_options(options)
             .with_schema(resolved_schema);
         let table = ListingTable::try_new(config)?;
-        self.ctx.register_table(name, Arc::new(table))?;
+        self.register_table(
+            name,
+            &PyTable {
+                table: Arc::new(table),
+            },
+        )?;
         Ok(())
     }
 
@@ -594,10 +599,8 @@ impl PySessionContext {
         Ok(df)
     }
 
-    pub fn register_table(&self, name: &str, table: Bound<'_, PyAny>) -> 
PyDataFusionResult<()> {
-        let table = PyTable::new(&table)?;
-
-        self.ctx.register_table(name, table.table)?;
+    pub fn register_table(&self, name: &str, table: &PyTable) -> 
PyDataFusionResult<()> {
+        self.ctx.register_table(name, table.table())?;
         Ok(())
     }
 
@@ -640,8 +643,23 @@ impl PySessionContext {
         name: &str,
         provider: Bound<'_, PyAny>,
     ) -> PyDataFusionResult<()> {
-        // Deprecated: use `register_table` instead
-        self.register_table(name, provider)
+        if provider.hasattr("__datafusion_table_provider__")? {
+            let capsule = 
provider.getattr("__datafusion_table_provider__")?.call0()?;
+            let capsule = 
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
+            validate_pycapsule(capsule, "datafusion_table_provider")?;
+
+            let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
+            let provider: ForeignTableProvider = provider.into();
+
+            let _ = self.ctx.register_table(name, Arc::new(provider))?;
+
+            Ok(())
+        } else {
+            Err(crate::errors::PyDataFusionError::Common(
+                "__datafusion_table_provider__ does not exist on Table 
Provider object."
+                    .to_string(),
+            ))
+        }
     }
 
     pub fn register_record_batches(
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 6aba4947..5882acf7 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -31,10 +31,12 @@ use datafusion::arrow::util::pretty;
 use datafusion::common::UnnestOptions;
 use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, 
TableParquetOptions};
 use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
+use datafusion::datasource::TableProvider;
 use datafusion::error::DataFusionError;
 use datafusion::execution::SendableRecordBatchStream;
 use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, 
ZstdLevel};
 use datafusion::prelude::*;
+use datafusion_ffi::table_provider::FFI_TableProvider;
 use futures::{StreamExt, TryStreamExt};
 use pyo3::exceptions::PyValueError;
 use pyo3::prelude::*;
@@ -42,12 +44,12 @@ use pyo3::pybacked::PyBackedStr;
 use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
 use tokio::task::JoinHandle;
 
+use crate::catalog::PyTable;
 use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
 use crate::expr::sort_expr::to_sort_expressions;
 use crate::physical_plan::PyExecutionPlan;
 use crate::record_batch::PyRecordBatchStream;
 use crate::sql::logical::PyLogicalPlan;
-use crate::table::PyTable;
 use crate::utils::{
     get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, 
validate_pycapsule, wait_for_future,
 };
@@ -56,6 +58,40 @@ use crate::{
     expr::{sort_expr::PySortExpr, PyExpr},
 };
 
+// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
+// - we have not decided on the table_provider approach yet
+// this is an interim implementation
+#[pyclass(name = "TableProvider", module = "datafusion")]
+pub struct PyTableProvider {
+    provider: Arc<dyn TableProvider + Send>,
+}
+
+impl PyTableProvider {
+    pub fn new(provider: Arc<dyn TableProvider>) -> Self {
+        Self { provider }
+    }
+
+    pub fn as_table(&self) -> PyTable {
+        let table_provider: Arc<dyn TableProvider> = self.provider.clone();
+        PyTable::new(table_provider)
+    }
+}
+
+#[pymethods]
+impl PyTableProvider {
+    fn __datafusion_table_provider__<'py>(
+        &self,
+        py: Python<'py>,
+    ) -> PyResult<Bound<'py, PyCapsule>> {
+        let name = CString::new("datafusion_table_provider").unwrap();
+
+        let runtime = get_tokio_runtime().0.handle().clone();
+        let provider = FFI_TableProvider::new(Arc::clone(&self.provider), 
false, Some(runtime));
+
+        PyCapsule::new(py, provider, Some(name.clone()))
+    }
+}
+
 /// Configuration for DataFrame display formatting
 #[derive(Debug, Clone)]
 pub struct FormatterConfig {
@@ -266,11 +302,6 @@ impl PyDataFrame {
         }
     }
 
-    /// Return a clone of the inner Arc<DataFrame> for crate-local callers.
-    pub(crate) fn inner_df(&self) -> Arc<DataFrame> {
-        Arc::clone(&self.df)
-    }
-
     fn prepare_repr_string(&mut self, py: Python, as_html: bool) -> 
PyDataFusionResult<String> {
         // Get the Python formatter and config
         let PythonFormatter { formatter, config } = 
get_python_formatter_with_config(py)?;
@@ -396,18 +427,22 @@ impl PyDataFrame {
         PyArrowType(self.df.schema().into())
     }
 
-    /// Convert this DataFrame into a Table Provider that can be used in 
register_table
+    /// Convert this DataFrame into a Table that can be used in register_table
     /// By convention, into_... methods consume self and return the new object.
     /// Disabling the clippy lint, so we can use &self
     /// because we're working with Python bindings
     /// where objects are shared
+    /// 
https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
+    /// - we have not decided on the table_provider approach yet
     #[allow(clippy::wrong_self_convention)]
-    pub fn into_view(&self) -> PyDataFusionResult<PyTable> {
+    fn into_view(&self) -> PyDataFusionResult<PyTable> {
         // Call the underlying Rust DataFrame::into_view method.
         // Note that the Rust method consumes self; here we clone the inner 
Arc<DataFrame>
-        // so that we don't invalidate this PyDataFrame.
+        // so that we don’t invalidate this PyDataFrame.
         let table_provider = self.df.as_ref().clone().into_view();
-        Ok(PyTable::from(table_provider))
+        let table_provider = PyTableProvider::new(table_provider);
+
+        Ok(table_provider.as_table())
     }
 
     #[pyo3(signature = (*args))]
diff --git a/src/lib.rs b/src/lib.rs
index 0361c731..29d3f41d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -52,7 +52,6 @@ pub mod pyarrow_util;
 mod record_batch;
 pub mod sql;
 pub mod store;
-pub mod table;
 pub mod unparser;
 
 #[cfg(feature = "substrait")]
diff --git a/src/table.rs b/src/table.rs
deleted file mode 100644
index 812581ed..00000000
--- a/src/table.rs
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::pyarrow::ToPyArrow;
-use datafusion::datasource::{TableProvider, TableType};
-use pyo3::prelude::*;
-use std::sync::Arc;
-
-use crate::dataframe::PyDataFrame;
-use crate::dataset::Dataset;
-use crate::utils::table_provider_from_pycapsule;
-
-/// This struct is used as a common method for all TableProviders,
-/// whether they refer to an FFI provider, an internally known
-/// implementation, a dataset, or a dataframe view.
-#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
-#[derive(Clone)]
-pub struct PyTable {
-    pub table: Arc<dyn TableProvider>,
-}
-
-impl PyTable {
-    pub fn table(&self) -> Arc<dyn TableProvider> {
-        self.table.clone()
-    }
-}
-
-#[pymethods]
-impl PyTable {
-    /// Instantiate from any Python object that supports any of the table
-    /// types. We do not know a priori when using this method if the object
-    /// will be passed a wrapped or raw class. Here we handle all of the
-    /// following object types:
-    ///
-    /// - PyTable (essentially a clone operation), but either raw or wrapped
-    /// - DataFrame, either raw or wrapped
-    /// - FFI Table Providers via PyCapsule
-    /// - PyArrow Dataset objects
-    #[new]
-    pub fn new(obj: &Bound<'_, PyAny>) -> PyResult<Self> {
-        if let Ok(py_table) = obj.extract::<PyTable>() {
-            Ok(py_table)
-        } else if let Ok(py_table) = obj
-            .getattr("_inner")
-            .and_then(|inner| inner.extract::<PyTable>())
-        {
-            Ok(py_table)
-        } else if let Ok(py_df) = obj.extract::<PyDataFrame>() {
-            let provider = py_df.inner_df().as_ref().clone().into_view();
-            Ok(PyTable::from(provider))
-        } else if let Ok(py_df) = obj
-            .getattr("df")
-            .and_then(|inner| inner.extract::<PyDataFrame>())
-        {
-            let provider = py_df.inner_df().as_ref().clone().into_view();
-            Ok(PyTable::from(provider))
-        } else if let Some(provider) = table_provider_from_pycapsule(obj)? {
-            Ok(PyTable::from(provider))
-        } else {
-            let py = obj.py();
-            let provider = Arc::new(Dataset::new(obj, py)?) as Arc<dyn 
TableProvider>;
-            Ok(PyTable::from(provider))
-        }
-    }
-
-    /// Get a reference to the schema for this table
-    #[getter]
-    fn schema(&self, py: Python) -> PyResult<PyObject> {
-        self.table.schema().to_pyarrow(py)
-    }
-
-    /// Get the type of this table for metadata/catalog purposes.
-    #[getter]
-    fn kind(&self) -> &str {
-        match self.table.table_type() {
-            TableType::Base => "physical",
-            TableType::View => "view",
-            TableType::Temporary => "temporary",
-        }
-    }
-
-    fn __repr__(&self) -> PyResult<String> {
-        let kind = self.kind();
-        Ok(format!("Table(kind={kind})"))
-    }
-}
-
-impl From<Arc<dyn TableProvider>> for PyTable {
-    fn from(table: Arc<dyn TableProvider>) -> Self {
-        Self { table }
-    }
-}
diff --git a/src/udtf.rs b/src/udtf.rs
index 7a1b0aaf..db16d6c0 100644
--- a/src/udtf.rs
+++ b/src/udtf.rs
@@ -18,13 +18,14 @@
 use pyo3::prelude::*;
 use std::sync::Arc;
 
+use crate::dataframe::PyTableProvider;
 use crate::errors::{py_datafusion_err, to_datafusion_err};
 use crate::expr::PyExpr;
-use crate::table::PyTable;
-use crate::utils::{table_provider_from_pycapsule, validate_pycapsule};
+use crate::utils::validate_pycapsule;
 use datafusion::catalog::{TableFunctionImpl, TableProvider};
 use datafusion::error::Result as DataFusionResult;
 use datafusion::logical_expr::Expr;
+use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
 use datafusion_ffi::udtf::{FFI_TableFunction, ForeignTableFunction};
 use pyo3::exceptions::PyNotImplementedError;
 use pyo3::types::{PyCapsule, PyTuple};
@@ -70,11 +71,11 @@ impl PyTableFunction {
     }
 
     #[pyo3(signature = (*args))]
-    pub fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyTable> {
+    pub fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyTableProvider> {
         let args: Vec<Expr> = args.iter().map(|e| e.expr.clone()).collect();
         let table_provider = self.call(&args).map_err(py_datafusion_err)?;
 
-        Ok(PyTable::from(table_provider))
+        Ok(PyTableProvider::new(table_provider))
     }
 
     fn __repr__(&self) -> PyResult<String> {
@@ -98,11 +99,20 @@ fn call_python_table_function(
         let provider_obj = func.call1(py, py_args)?;
         let provider = provider_obj.bind(py);
 
-        table_provider_from_pycapsule(provider)?.ok_or_else(|| {
-            PyNotImplementedError::new_err(
+        if provider.hasattr("__datafusion_table_provider__")? {
+            let capsule = 
provider.getattr("__datafusion_table_provider__")?.call0()?;
+            let capsule = 
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
+            validate_pycapsule(capsule, "datafusion_table_provider")?;
+
+            let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
+            let provider: ForeignTableProvider = provider.into();
+
+            Ok(Arc::new(provider) as Arc<dyn TableProvider>)
+        } else {
+            Err(PyNotImplementedError::new_err(
                 "__datafusion_table_provider__ does not exist on Table 
Provider object.",
-            )
-        })
+            ))
+        }
     })
     .map_err(to_datafusion_err)
 }
diff --git a/src/utils.rs b/src/utils.rs
index 0fcfadce..3b30de5d 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -15,26 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::errors::py_datafusion_err;
 use crate::{
     common::data_type::PyScalarValue,
     errors::{PyDataFusionError, PyDataFusionResult},
     TokioRuntime,
 };
 use datafusion::{
-    common::ScalarValue, datasource::TableProvider, 
execution::context::SessionContext,
-    logical_expr::Volatility,
+    common::ScalarValue, execution::context::SessionContext, 
logical_expr::Volatility,
 };
-use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
 use pyo3::prelude::*;
 use pyo3::{exceptions::PyValueError, types::PyCapsule};
-use std::{
-    future::Future,
-    sync::{Arc, OnceLock},
-    time::Duration,
-};
+use std::{future::Future, sync::OnceLock, time::Duration};
 use tokio::{runtime::Runtime, time::sleep};
-
 /// Utility to get the Tokio Runtime from Python
 #[inline]
 pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
@@ -99,7 +91,7 @@ pub(crate) fn parse_volatility(value: &str) -> 
PyDataFusionResult<Volatility> {
         "volatile" => Volatility::Volatile,
         value => {
             return Err(PyDataFusionError::Common(format!(
-                "Unsupported volatility type: `{value}`, supported \
+                "Unsupportad volatility type: `{value}`, supported \
                  values are: immutable, stable and volatile."
             )))
         }
@@ -109,9 +101,9 @@ pub(crate) fn parse_volatility(value: &str) -> 
PyDataFusionResult<Volatility> {
 pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> 
PyResult<()> {
     let capsule_name = capsule.name()?;
     if capsule_name.is_none() {
-        return Err(PyValueError::new_err(format!(
-            "Expected {name} PyCapsule to have name set."
-        )));
+        return Err(PyValueError::new_err(
+            "Expected schema PyCapsule to have name set.",
+        ));
     }
 
     let capsule_name = capsule_name.unwrap().to_str()?;
@@ -124,23 +116,6 @@ pub(crate) fn validate_pycapsule(capsule: 
&Bound<PyCapsule>, name: &str) -> PyRe
     Ok(())
 }
 
-pub(crate) fn table_provider_from_pycapsule(
-    obj: &Bound<PyAny>,
-) -> PyResult<Option<Arc<dyn TableProvider>>> {
-    if obj.hasattr("__datafusion_table_provider__")? {
-        let capsule = obj.getattr("__datafusion_table_provider__")?.call0()?;
-        let capsule = 
capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
-        validate_pycapsule(capsule, "datafusion_table_provider")?;
-
-        let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
-        let provider: ForeignTableProvider = provider.into();
-
-        Ok(Some(Arc::new(provider)))
-    } else {
-        Ok(None)
-    }
-}
-
 pub(crate) fn py_obj_to_scalar_value(py: Python, obj: PyObject) -> 
PyResult<ScalarValue> {
     // convert Python object to PyScalarValue to ScalarValue
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to