This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b44d5a2289 [python] Use SQLContext from pypaimon-rust directly (#7787)
b44d5a2289 is described below
commit b44d5a2289ce4f8befb4ec05e7d0476c7b5ed56b
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat May 9 10:58:00 2026 +0800
[python] Use SQLContext from pypaimon-rust directly (#7787)
Remove the Python SQLContext wrapper (sql_context.py) and use the native
Rust-based SQLContext from pypaimon_rust.datafusion instead. Update
imports, CLI, tests, and docs accordingly.
---
docs/content/pypaimon/sql.md | 109 ++++++++---------------
paimon-python/pypaimon/__init__.py | 8 +-
paimon-python/pypaimon/cli/cli_sql.py | 18 ++--
paimon-python/pypaimon/sql/__init__.py | 2 +-
paimon-python/pypaimon/sql/sql_context.py | 81 -----------------
paimon-python/pypaimon/tests/sql_context_test.py | 109 ++++++-----------------
6 files changed, 85 insertions(+), 242 deletions(-)
diff --git a/docs/content/pypaimon/sql.md b/docs/content/pypaimon/sql.md
index 88d08a01d0..d53be2cb9d 100644
--- a/docs/content/pypaimon/sql.md
+++ b/docs/content/pypaimon/sql.md
@@ -27,7 +27,7 @@ under the License.
# SQL Query
-PyPaimon supports executing SQL queries on Paimon tables, powered by
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
and [DataFusion](https://datafusion.apache.org/python/).
+PyPaimon supports executing SQL queries on Paimon tables, powered by
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
and [DataFusion](https://datafusion.apache.org/).
## Installation
@@ -37,7 +37,7 @@ SQL query support requires additional dependencies. Install
them with:
pip install pypaimon[sql]
```
-This will install `pypaimon-rust` and `datafusion`.
+This will install `pypaimon-rust` (which bundles DataFusion).
## Usage
@@ -46,15 +46,17 @@ Create a `SQLContext`, register one or more catalogs with
their options, and run
### Basic Query
```python
-from pypaimon.sql import SQLContext
+from pypaimon_rust.datafusion import SQLContext
+import pyarrow as pa
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
ctx.set_current_catalog("paimon")
ctx.set_current_database("default")
-# Execute SQL and get PyArrow Table
-table = ctx.sql("SELECT * FROM my_table")
+# Execute SQL and get PyArrow RecordBatches
+batches = ctx.sql("SELECT * FROM my_table")
+table = pa.Table.from_batches(batches)
print(table)
# Convert to Pandas DataFrame
@@ -62,9 +64,15 @@ df = table.to_pandas()
print(df)
```
+`SQLContext` can also be imported from `pypaimon`:
+
+```python
+from pypaimon import SQLContext
+```
+
### Table Reference Format
-The default catalog and default database can be configured via
`set_current_catalog()` and `set_current_database()`, so you can reference
tables in two ways:
+The default catalog and default database can be configured via
`set_current_catalog()` and `set_current_database()`, so you can reference
tables in multiple ways:
```python
# Direct table name (uses default database)
@@ -72,61 +80,9 @@ ctx.sql("SELECT * FROM my_table")
# Two-part: database.table
ctx.sql("SELECT * FROM mydb.my_table")
-```
-
-### Filtering
-
-```python
-table = ctx.sql("""
- SELECT id, name, age
- FROM users
- WHERE age > 18 AND city = 'Beijing'
-""")
-```
-
-### Aggregation
-
-```python
-table = ctx.sql("""
- SELECT city, COUNT(*) AS cnt, AVG(age) AS avg_age
- FROM users
- GROUP BY city
- ORDER BY cnt DESC
-""")
-```
-
-### Join
-
-```python
-table = ctx.sql("""
- SELECT u.name, o.order_id, o.amount
- FROM users u
- JOIN orders o ON u.id = o.user_id
- WHERE o.amount > 100
-""")
-```
-### Subquery
-
-```python
-table = ctx.sql("""
- SELECT * FROM users
- WHERE id IN (
- SELECT user_id FROM orders
- WHERE amount > 1000
- )
-""")
-```
-
-### Cross-Database Query
-
-```python
-# Query a table in another database using two-part syntax
-table = ctx.sql("""
- SELECT u.name, o.amount
- FROM default.users u
- JOIN analytics.orders o ON u.id = o.user_id
-""")
+# Three-part: catalog.database.table
+ctx.sql("SELECT * FROM paimon.mydb.my_table")
```
### Multi-Catalog Query
@@ -134,8 +90,6 @@ table = ctx.sql("""
`SQLContext` supports registering multiple catalogs for cross-catalog queries:
```python
-from pypaimon.sql import SQLContext
-
ctx = SQLContext()
ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"})
ctx.register_catalog("b", {
@@ -147,22 +101,35 @@ ctx.set_current_catalog("a")
ctx.set_current_database("default")
# Cross-catalog join
-table = ctx.sql("""
+batches = ctx.sql("""
SELECT a_users.name, b_orders.amount
FROM a.default.users AS a_users
JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id
""")
```
+### Register Arrow Batches
+
+You can register PyArrow RecordBatches as temporary tables:
+
+```python
+batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])
+ctx.register_batch("paimon.default.my_temp", batch)
+batches = ctx.sql("SELECT * FROM paimon.default.my_temp")
+```
+
## Supported SQL Syntax
-The SQL engine is powered by Apache DataFusion, which supports a rich set of
SQL syntax including:
+The SQL engine is powered by Apache DataFusion, which supports a rich set of
SQL syntax. For the full SQL reference, see the [paimon-rust SQL
documentation](https://paimon-rust.apache.org/sql.html) which covers:
-- `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `ORDER BY`, `LIMIT`
-- `JOIN` (INNER, LEFT, RIGHT, FULL, CROSS)
-- Subqueries and CTEs (`WITH`)
-- Aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, etc.)
-- Window functions (`ROW_NUMBER`, `RANK`, `LAG`, `LEAD`, etc.)
-- `UNION`, `INTERSECT`, `EXCEPT`
+- **DDL**: `CREATE SCHEMA`, `CREATE TABLE` (with `PARTITIONED BY`, `PRIMARY
KEY`, `WITH` options), `DROP TABLE`, `ALTER TABLE`, `CREATE TEMPORARY
TABLE/VIEW`
+- **DML**: `INSERT INTO`, `INSERT OVERWRITE` (dynamic/static partitions),
`UPDATE`, `DELETE`, `MERGE INTO`, `TRUNCATE TABLE`
+- **Procedures**: `CALL sys.create_tag`, `CALL sys.rollback_to`, etc.
+- **Queries**: `SELECT`, column projection, filter pushdown, `COUNT(*)`
pushdown
+- **Time Travel**: `VERSION AS OF`, `TIMESTAMP AS OF`
+- **Vector Search**: `vector_search()` table function
+- **Full-Text Search**: `full_text_search()` table function
+- **Dynamic Options**: `SET` / `RESET`
+- **System Tables**: `$options`, `$schemas`, `$snapshots`, `$tags`,
`$manifests`
-For the full SQL reference, see the [DataFusion SQL
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
+For the DataFusion query syntax (JOINs, aggregations, subqueries, CTEs, window
functions, etc.), see the [DataFusion SQL
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index e07179fb28..a1b9862752 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -28,7 +28,6 @@ from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
from pypaimon.tag.tag import Tag
from pypaimon.tag.tag_manager import TagManager
-from pypaimon.sql.sql_context import SQLContext
__all__ = [
"PaimonVirtualFileSystem",
@@ -38,3 +37,10 @@ __all__ = [
"TagManager",
"SQLContext",
]
+
+
+def __getattr__(name):
+ if name == "SQLContext":
+ from pypaimon_rust.datafusion import SQLContext
+ return SQLContext
+ raise AttributeError("module 'pypaimon' has no attribute {}".format(name))
diff --git a/paimon-python/pypaimon/cli/cli_sql.py
b/paimon-python/pypaimon/cli/cli_sql.py
index 170cda8710..dac7a26378 100644
--- a/paimon-python/pypaimon/cli/cli_sql.py
+++ b/paimon-python/pypaimon/cli/cli_sql.py
@@ -26,6 +26,8 @@ import re
import sys
import time
+import pyarrow as pa
+
_PAIMON_BANNER = r"""
____ _
/ __ \____ _(_)___ ___ ____ ____
@@ -131,7 +133,7 @@ def cmd_sql(args):
config = load_catalog_config(config_path)
try:
- from pypaimon.sql.sql_context import SQLContext
+ from pypaimon_rust.datafusion import SQLContext
catalog_options = {str(k): str(v) for k, v in config.items()}
ctx = SQLContext()
ctx.register_catalog("paimon", catalog_options)
@@ -151,12 +153,15 @@ def cmd_sql(args):
def _execute_query(ctx, query, output_format):
"""Execute a single SQL query and print the result."""
try:
- table = ctx.sql(query)
+ batches = ctx.sql(query)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
- _print_table(table, output_format)
+ if batches:
+ _print_table(pa.Table.from_batches(batches), output_format)
+ else:
+ print("OK")
def _print_table(table, output_format, elapsed=None):
@@ -255,9 +260,12 @@ def _interactive_repl(ctx, output_format):
try:
start = time.time()
- table = ctx.sql(query)
+ batches = ctx.sql(query)
elapsed = time.time() - start
- _print_table(table, output_format, elapsed)
+ if batches:
+ _print_table(pa.Table.from_batches(batches),
output_format, elapsed)
+ else:
+ print(f"OK ({elapsed:.2f}s)")
print()
except Exception as e:
print(f"Error: {e}\n", file=sys.stderr)
diff --git a/paimon-python/pypaimon/sql/__init__.py
b/paimon-python/pypaimon/sql/__init__.py
index e059c025a6..27ff4a3bdd 100644
--- a/paimon-python/pypaimon/sql/__init__.py
+++ b/paimon-python/pypaimon/sql/__init__.py
@@ -20,6 +20,6 @@ __all__ = ['SQLContext']
def __getattr__(name):
if name == "SQLContext":
- from pypaimon.sql.sql_context import SQLContext
+ from pypaimon_rust.datafusion import SQLContext
return SQLContext
raise AttributeError("module 'pypaimon.sql' has no attribute
{}".format(name))
diff --git a/paimon-python/pypaimon/sql/sql_context.py
b/paimon-python/pypaimon/sql/sql_context.py
deleted file mode 100644
index 67afa30f34..0000000000
--- a/paimon-python/pypaimon/sql/sql_context.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-
-
-class SQLContext:
- """SQL query context for Paimon tables.
-
- Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon
tables.
- Supports registering multiple catalogs for cross-catalog queries.
-
- Example::
-
- from pypaimon.sql import SQLContext
-
- ctx = SQLContext()
- ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
- ctx.set_current_catalog("paimon")
- ctx.set_current_database("default")
- result = ctx.sql("SELECT * FROM my_table")
- """
-
- def __init__(self):
- try:
- from datafusion import SessionContext
- except ImportError:
- raise ImportError(
- "datafusion is required for SQL query support. "
- "Install it with: pip install pypaimon[sql]"
- )
-
- self._ctx = SessionContext()
-
- def register_catalog(self, name, options):
- """Register a Paimon catalog as a DataFusion catalog provider.
-
- Args:
- name: The catalog name to register under.
- options: A dict of catalog options (e.g. {"warehouse":
"/path/to/warehouse"}).
- """
- try:
- from pypaimon_rust.datafusion import PaimonCatalog
- except ImportError:
- raise ImportError(
- "pypaimon-rust is required for SQL query support. "
- "Install it with: pip install pypaimon[sql]"
- )
-
- paimon_catalog = PaimonCatalog(options)
- self._ctx.register_catalog_provider(name, paimon_catalog)
-
- def set_current_catalog(self, catalog_name: str):
- """Set the default catalog for SQL queries."""
- self._ctx.sql(f"SET datafusion.catalog.default_catalog =
'{catalog_name}'")
-
- def set_current_database(self, database: str):
- """Set the default database for SQL queries."""
- self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'")
-
- def sql(self, query: str) -> pa.Table:
- """Execute a SQL query and return the result as a PyArrow Table."""
- df = self._ctx.sql(query)
- batches = df.collect()
- if not batches:
- return pa.Table.from_batches([], schema=df.schema())
- return pa.Table.from_batches(batches)
diff --git a/paimon-python/pypaimon/tests/sql_context_test.py
b/paimon-python/pypaimon/tests/sql_context_test.py
index 931052fae1..bef030caf7 100644
--- a/paimon-python/pypaimon/tests/sql_context_test.py
+++ b/paimon-python/pypaimon/tests/sql_context_test.py
@@ -20,7 +20,7 @@ import unittest
import pyarrow as pa
-from pypaimon import CatalogFactory
+from pypaimon_rust.datafusion import SQLContext
WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse")
@@ -28,72 +28,31 @@ WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE",
"/tmp/paimon-warehouse")
class SQLContextTest(unittest.TestCase):
- _table_created = False
-
- def _create_catalog(self):
- return CatalogFactory.create({"warehouse": WAREHOUSE})
-
- def _create_sql_context(self):
- from pypaimon.sql.sql_context import SQLContext
- ctx = SQLContext()
- ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
- ctx.set_current_catalog("paimon")
- ctx.set_current_database("default")
- return ctx
-
@classmethod
def setUpClass(cls):
"""Create the test table once before all tests in this class."""
- from pypaimon import Schema, CatalogFactory
- from pypaimon.schema.data_types import DataField, AtomicType
-
- catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
- try:
- catalog.create_database("default", ignore_if_exists=True)
- except Exception:
- pass
-
- identifier = "default.sql_test_table"
-
- # Drop existing table to ensure clean state
- catalog.drop_table(identifier, ignore_if_not_exists=True)
-
- schema = Schema(
- fields=[
- DataField(0, "id", AtomicType("INT")),
- DataField(1, "name", AtomicType("STRING")),
- ],
- primary_keys=[],
- partition_keys=[],
- options={},
- comment="",
- )
- catalog.create_table(identifier, schema, ignore_if_exists=False)
-
- table = catalog.get_table(identifier)
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- try:
- pa_table = pa.table({
- "id": pa.array([1, 2, 3], type=pa.int32()),
- "name": pa.array(["alice", "bob", "carol"], type=pa.string()),
- })
- table_write.write_arrow(pa_table)
- table_commit.commit(table_write.prepare_commit())
- finally:
- table_write.close()
- table_commit.close()
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+ ctx.sql("DROP TABLE IF EXISTS sql_test_table")
+ ctx.sql("CREATE TABLE sql_test_table (id INT, name STRING)")
+ ctx.sql("INSERT INTO sql_test_table VALUES (1, 'alice'), (2, 'bob'),
(3, 'carol')")
@classmethod
def tearDownClass(cls):
"""Clean up the test table after all tests."""
- catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
- catalog.drop_table("default.sql_test_table", ignore_if_not_exists=True)
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+ ctx.sql("DROP TABLE IF EXISTS sql_test_table")
+
+ def _create_sql_context(self):
+ ctx = SQLContext()
+ ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+ return ctx
def test_sql_returns_table(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ batches = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ table = pa.Table.from_batches(batches)
self.assertIsInstance(table, pa.Table)
self.assertEqual(table.num_rows, 3)
self.assertEqual(table.column("id").to_pylist(), [1, 2, 3])
@@ -101,52 +60,36 @@ class SQLContextTest(unittest.TestCase):
def test_sql_to_pandas(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ batches = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+ table = pa.Table.from_batches(batches)
df = table.to_pandas()
self.assertEqual(len(df), 3)
self.assertListEqual(list(df.columns), ["id", "name"])
def test_sql_with_filter(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1
ORDER BY id")
+ batches = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1
ORDER BY id")
+ table = pa.Table.from_batches(batches)
self.assertEqual(table.num_rows, 2)
self.assertEqual(table.column("id").to_pylist(), [2, 3])
def test_sql_with_empty_result(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4
ORDER BY id")
- self.assertIsInstance(table, pa.Table)
- self.assertEqual(table.num_rows, 0)
- self.assertEqual(table.schema.names, ["id", "name"])
+ batches = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4
ORDER BY id")
+ self.assertEqual(len(batches), 0)
def test_sql_with_aggregation(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+ batches = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+ table = pa.Table.from_batches(batches)
self.assertEqual(table.column("cnt").to_pylist(), [3])
def test_sql_two_part_reference(self):
ctx = self._create_sql_context()
- table = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+ batches = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+ table = pa.Table.from_batches(batches)
self.assertEqual(table.column("cnt").to_pylist(), [3])
- def test_import_error_without_pypaimon_rust(self):
- """register_catalog should raise ImportError when pypaimon-rust is
missing."""
- import unittest.mock as mock
- import builtins
- original_import = builtins.__import__
-
- def mock_import(name, *args, **kwargs):
- if name == "pypaimon_rust.datafusion" or name == "pypaimon_rust":
- raise ImportError("No module named 'pypaimon_rust'")
- return original_import(name, *args, **kwargs)
-
- from pypaimon.sql.sql_context import SQLContext
- ctx = SQLContext()
- with mock.patch("builtins.__import__", side_effect=mock_import):
- with self.assertRaises(ImportError) as cm:
- ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
- self.assertIn("pypaimon-rust", str(cm.exception))
-
if __name__ == "__main__":
unittest.main()