tespent commented on issue #1103:
URL:
https://github.com/apache/datafusion-python/issues/1103#issuecomment-2799841561
@timsaucer This is wonderful! However, I think FFI CatalogProvider is not
enough for my needs, since I'm looking for *pure python-written*
CatalogProvider and SchemaProvider. In my case, I need to wrap python-based
data sources into a CatalogProvider, which still requires an extra non-python
layer, even after we have the FFI interface (since python basically cannot
expose C interface so it is excluded in the supported languages for FFI).
The critical point for me is to have some python logic to describe catalog,
schema and tables. And I suppose this will requires much additional work after
FFI-based interface. Let me show some conceptional code for this:
```python
class MyCatalogProvider(CatalogProvider):
def __init__(self, **client_kwargs):
self._client = ...
def schema_names(self) -> list[str]:
return self._client.list_databases(name_only=True)
def schema(self, name: str) -> SchemaProvider | None:
try:
db = self._client.get_database(name)
except FileNotFoundException:
return None
return MySchemaProvider(self._client, db)
class MySchemaProvider(SchemaProvider):
def __init__(self, client: SomeClient, database: MyDatabaseInClient):
self._client = client
self._database = database
def table_names(self) -> list[str]:
return self._client.list_tables(self._database, name_only=True)
def table(self, name: str) -> TableProvider | None:
try:
tbl = self._client.get_table(self._database, name)
except FileNotFoundException:
return None
return MyTableProvider(self._client, self._database, tbl)
def table_exists(self, name: str) -> bool:
return self._client.table_exists(self._database, name)
class MyTableProvider(TableProvider):
def __init__(self, client: SomeClient, schema: MyDatabaseInClient,
table: MyTableInClient):
self._client = client
self._table = table
def schema(self) -> pyarrow.Schema:
return self._table.schema
def scan(self, session: SessionInfo, projection: list[int] | None,
filters: list[datafusion.Expr], limit: int | None) -> ExecutionPlan:
plan = self._client.plan_read(self._table)
... # logics to plan the execution
return MyPythonExec(plan, data_schema)
def supports_filters_pushdown(self, filters: list[datafusion.Expr]) ->
list[TableProviderFilterPushDown]:
....
class MyPythonExec(ExecutionPlan):
def __init__(self, ...):
...
def __repr__(self) -> str:
...
@property
def schema(self) -> pyarrow.Schema:
...
@property
def num_partitions(self) -> int:
...
def partition(self, id: int) -> Iterable[pyarrow.RecordBatch]:
...
ctx = SessionContext()
ctx.register_catalog("mine", MyCatalogProvider(...))
print(ctx.sql("select a,b from mine.db.tbl where a>10 and b < 20")) #
preview the result for sql
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]