This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b44d5a2289 [python] Use SQLContext from pypaimon-rust directly (#7787)
b44d5a2289 is described below

commit b44d5a2289ce4f8befb4ec05e7d0476c7b5ed56b
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat May 9 10:58:00 2026 +0800

    [python] Use SQLContext from pypaimon-rust directly (#7787)
    
    Remove the Python SQLContext wrapper (sql_context.py) and use the native
    Rust-based SQLContext from pypaimon_rust.datafusion instead. Update
    imports, CLI, tests, and docs accordingly.
---
 docs/content/pypaimon/sql.md                     | 109 ++++++++---------------
 paimon-python/pypaimon/__init__.py               |   8 +-
 paimon-python/pypaimon/cli/cli_sql.py            |  18 ++--
 paimon-python/pypaimon/sql/__init__.py           |   2 +-
 paimon-python/pypaimon/sql/sql_context.py        |  81 -----------------
 paimon-python/pypaimon/tests/sql_context_test.py | 109 ++++++-----------------
 6 files changed, 85 insertions(+), 242 deletions(-)

diff --git a/docs/content/pypaimon/sql.md b/docs/content/pypaimon/sql.md
index 88d08a01d0..d53be2cb9d 100644
--- a/docs/content/pypaimon/sql.md
+++ b/docs/content/pypaimon/sql.md
@@ -27,7 +27,7 @@ under the License.
 
 # SQL Query
 
-PyPaimon supports executing SQL queries on Paimon tables, powered by 
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
 and [DataFusion](https://datafusion.apache.org/python/).
+PyPaimon supports executing SQL queries on Paimon tables, powered by 
[pypaimon-rust](https://github.com/apache/paimon-rust/tree/main/bindings/python)
 and [DataFusion](https://datafusion.apache.org/).
 
 ## Installation
 
@@ -37,7 +37,7 @@ SQL query support requires additional dependencies. Install 
them with:
 pip install pypaimon[sql]
 ```
 
-This will install `pypaimon-rust` and `datafusion`.
+This will install `pypaimon-rust` (which bundles DataFusion).
 
 ## Usage
 
@@ -46,15 +46,17 @@ Create a `SQLContext`, register one or more catalogs with 
their options, and run
 ### Basic Query
 
 ```python
-from pypaimon.sql import SQLContext
+from pypaimon_rust.datafusion import SQLContext
+import pyarrow as pa
 
 ctx = SQLContext()
 ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
 ctx.set_current_catalog("paimon")
 ctx.set_current_database("default")
 
-# Execute SQL and get PyArrow Table
-table = ctx.sql("SELECT * FROM my_table")
+# Execute SQL and get PyArrow RecordBatches
+batches = ctx.sql("SELECT * FROM my_table")
+table = pa.Table.from_batches(batches)
 print(table)
 
 # Convert to Pandas DataFrame
@@ -62,9 +64,15 @@ df = table.to_pandas()
 print(df)
 ```
 
+`SQLContext` can also be imported from `pypaimon`:
+
+```python
+from pypaimon import SQLContext
+```
+
 ### Table Reference Format
 
-The default catalog and default database can be configured via 
`set_current_catalog()` and `set_current_database()`, so you can reference 
tables in two ways:
+The default catalog and default database can be configured via 
`set_current_catalog()` and `set_current_database()`, so you can reference 
tables in multiple ways:
 
 ```python
 # Direct table name (uses default database)
@@ -72,61 +80,9 @@ ctx.sql("SELECT * FROM my_table")
 
 # Two-part: database.table
 ctx.sql("SELECT * FROM mydb.my_table")
-```
-
-### Filtering
-
-```python
-table = ctx.sql("""
-    SELECT id, name, age 
-    FROM users 
-    WHERE age > 18 AND city = 'Beijing'
-""")
-```
-
-### Aggregation
-
-```python
-table = ctx.sql("""
-    SELECT city, COUNT(*) AS cnt, AVG(age) AS avg_age
-    FROM users
-    GROUP BY city
-    ORDER BY cnt DESC
-""")
-```
-
-### Join
-
-```python
-table = ctx.sql("""
-    SELECT u.name, o.order_id, o.amount
-    FROM users u
-    JOIN orders o ON u.id = o.user_id
-    WHERE o.amount > 100
-""")
-```
 
-### Subquery
-
-```python
-table = ctx.sql("""
-    SELECT * FROM users
-    WHERE id IN (
-        SELECT user_id FROM orders
-        WHERE amount > 1000
-    )
-""")
-```
-
-### Cross-Database Query
-
-```python
-# Query a table in another database using two-part syntax
-table = ctx.sql("""
-    SELECT u.name, o.amount
-    FROM default.users u
-    JOIN analytics.orders o ON u.id = o.user_id
-""")
+# Three-part: catalog.database.table
+ctx.sql("SELECT * FROM paimon.mydb.my_table")
 ```
 
 ### Multi-Catalog Query
@@ -134,8 +90,6 @@ table = ctx.sql("""
 `SQLContext` supports registering multiple catalogs for cross-catalog queries:
 
 ```python
-from pypaimon.sql import SQLContext
-
 ctx = SQLContext()
 ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"})
 ctx.register_catalog("b", {
@@ -147,22 +101,35 @@ ctx.set_current_catalog("a")
 ctx.set_current_database("default")
 
 # Cross-catalog join
-table = ctx.sql("""
+batches = ctx.sql("""
     SELECT a_users.name, b_orders.amount
     FROM a.default.users AS a_users
     JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id
 """)
 ```
 
+### Register Arrow Batches
+
+You can register PyArrow RecordBatches as temporary tables:
+
+```python
+batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])
+ctx.register_batch("paimon.default.my_temp", batch)
+batches = ctx.sql("SELECT * FROM paimon.default.my_temp")
+```
+
 ## Supported SQL Syntax
 
-The SQL engine is powered by Apache DataFusion, which supports a rich set of 
SQL syntax including:
+The SQL engine is powered by Apache DataFusion, which supports a rich set of 
SQL syntax. For the full SQL reference, see the [paimon-rust SQL 
documentation](https://paimon-rust.apache.org/sql.html) which covers:
 
-- `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `ORDER BY`, `LIMIT`
-- `JOIN` (INNER, LEFT, RIGHT, FULL, CROSS)
-- Subqueries and CTEs (`WITH`)
-- Aggregate functions (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`, etc.)
-- Window functions (`ROW_NUMBER`, `RANK`, `LAG`, `LEAD`, etc.)
-- `UNION`, `INTERSECT`, `EXCEPT`
+- **DDL**: `CREATE SCHEMA`, `CREATE TABLE` (with `PARTITIONED BY`, `PRIMARY 
KEY`, `WITH` options), `DROP TABLE`, `ALTER TABLE`, `CREATE TEMPORARY 
TABLE/VIEW`
+- **DML**: `INSERT INTO`, `INSERT OVERWRITE` (dynamic/static partitions), 
`UPDATE`, `DELETE`, `MERGE INTO`, `TRUNCATE TABLE`
+- **Procedures**: `CALL sys.create_tag`, `CALL sys.rollback_to`, etc.
+- **Queries**: `SELECT`, column projection, filter pushdown, `COUNT(*)` 
pushdown
+- **Time Travel**: `VERSION AS OF`, `TIMESTAMP AS OF`
+- **Vector Search**: `vector_search()` table function
+- **Full-Text Search**: `full_text_search()` table function
+- **Dynamic Options**: `SET` / `RESET`
+- **System Tables**: `$options`, `$schemas`, `$snapshots`, `$tags`, 
`$manifests`
 
-For the full SQL reference, see the [DataFusion SQL 
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
+For the DataFusion query syntax (JOINs, aggregations, subqueries, CTEs, window 
functions, etc.), see the [DataFusion SQL 
documentation](https://datafusion.apache.org/user-guide/sql/index.html).
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index e07179fb28..a1b9862752 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -28,7 +28,6 @@ from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.schema.schema import Schema
 from pypaimon.tag.tag import Tag
 from pypaimon.tag.tag_manager import TagManager
-from pypaimon.sql.sql_context import SQLContext
 
 __all__ = [
     "PaimonVirtualFileSystem",
@@ -38,3 +37,10 @@ __all__ = [
     "TagManager",
     "SQLContext",
 ]
+
+
+def __getattr__(name):
+    if name == "SQLContext":
+        from pypaimon_rust.datafusion import SQLContext
+        return SQLContext
+    raise AttributeError("module 'pypaimon' has no attribute {}".format(name))
diff --git a/paimon-python/pypaimon/cli/cli_sql.py 
b/paimon-python/pypaimon/cli/cli_sql.py
index 170cda8710..dac7a26378 100644
--- a/paimon-python/pypaimon/cli/cli_sql.py
+++ b/paimon-python/pypaimon/cli/cli_sql.py
@@ -26,6 +26,8 @@ import re
 import sys
 import time
 
+import pyarrow as pa
+
 _PAIMON_BANNER = r"""
     ____        _
    / __ \____ _(_)___ ___  ____  ____
@@ -131,7 +133,7 @@ def cmd_sql(args):
     config = load_catalog_config(config_path)
 
     try:
-        from pypaimon.sql.sql_context import SQLContext
+        from pypaimon_rust.datafusion import SQLContext
         catalog_options = {str(k): str(v) for k, v in config.items()}
         ctx = SQLContext()
         ctx.register_catalog("paimon", catalog_options)
@@ -151,12 +153,15 @@ def cmd_sql(args):
 def _execute_query(ctx, query, output_format):
     """Execute a single SQL query and print the result."""
     try:
-        table = ctx.sql(query)
+        batches = ctx.sql(query)
     except Exception as e:
         print(f"Error: {e}", file=sys.stderr)
         sys.exit(1)
 
-    _print_table(table, output_format)
+    if batches:
+        _print_table(pa.Table.from_batches(batches), output_format)
+    else:
+        print("OK")
 
 
 def _print_table(table, output_format, elapsed=None):
@@ -255,9 +260,12 @@ def _interactive_repl(ctx, output_format):
 
             try:
                 start = time.time()
-                table = ctx.sql(query)
+                batches = ctx.sql(query)
                 elapsed = time.time() - start
-                _print_table(table, output_format, elapsed)
+                if batches:
+                    _print_table(pa.Table.from_batches(batches), 
output_format, elapsed)
+                else:
+                    print(f"OK ({elapsed:.2f}s)")
                 print()
             except Exception as e:
                 print(f"Error: {e}\n", file=sys.stderr)
diff --git a/paimon-python/pypaimon/sql/__init__.py 
b/paimon-python/pypaimon/sql/__init__.py
index e059c025a6..27ff4a3bdd 100644
--- a/paimon-python/pypaimon/sql/__init__.py
+++ b/paimon-python/pypaimon/sql/__init__.py
@@ -20,6 +20,6 @@ __all__ = ['SQLContext']
 
 def __getattr__(name):
     if name == "SQLContext":
-        from pypaimon.sql.sql_context import SQLContext
+        from pypaimon_rust.datafusion import SQLContext
         return SQLContext
     raise AttributeError("module 'pypaimon.sql' has no attribute 
{}".format(name))
diff --git a/paimon-python/pypaimon/sql/sql_context.py 
b/paimon-python/pypaimon/sql/sql_context.py
deleted file mode 100644
index 67afa30f34..0000000000
--- a/paimon-python/pypaimon/sql/sql_context.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing,
-#  software distributed under the License is distributed on an
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-#  KIND, either express or implied.  See the License for the
-#  specific language governing permissions and limitations
-#  under the License.
-
-import pyarrow as pa
-
-
-class SQLContext:
-    """SQL query context for Paimon tables.
-
-    Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon 
tables.
-    Supports registering multiple catalogs for cross-catalog queries.
-
-    Example::
-
-        from pypaimon.sql import SQLContext
-
-        ctx = SQLContext()
-        ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
-        ctx.set_current_catalog("paimon")
-        ctx.set_current_database("default")
-        result = ctx.sql("SELECT * FROM my_table")
-    """
-
-    def __init__(self):
-        try:
-            from datafusion import SessionContext
-        except ImportError:
-            raise ImportError(
-                "datafusion is required for SQL query support. "
-                "Install it with: pip install pypaimon[sql]"
-            )
-
-        self._ctx = SessionContext()
-
-    def register_catalog(self, name, options):
-        """Register a Paimon catalog as a DataFusion catalog provider.
-
-        Args:
-            name: The catalog name to register under.
-            options: A dict of catalog options (e.g. {"warehouse": 
"/path/to/warehouse"}).
-        """
-        try:
-            from pypaimon_rust.datafusion import PaimonCatalog
-        except ImportError:
-            raise ImportError(
-                "pypaimon-rust is required for SQL query support. "
-                "Install it with: pip install pypaimon[sql]"
-            )
-
-        paimon_catalog = PaimonCatalog(options)
-        self._ctx.register_catalog_provider(name, paimon_catalog)
-
-    def set_current_catalog(self, catalog_name: str):
-        """Set the default catalog for SQL queries."""
-        self._ctx.sql(f"SET datafusion.catalog.default_catalog = 
'{catalog_name}'")
-
-    def set_current_database(self, database: str):
-        """Set the default database for SQL queries."""
-        self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'")
-
-    def sql(self, query: str) -> pa.Table:
-        """Execute a SQL query and return the result as a PyArrow Table."""
-        df = self._ctx.sql(query)
-        batches = df.collect()
-        if not batches:
-            return pa.Table.from_batches([], schema=df.schema())
-        return pa.Table.from_batches(batches)
diff --git a/paimon-python/pypaimon/tests/sql_context_test.py 
b/paimon-python/pypaimon/tests/sql_context_test.py
index 931052fae1..bef030caf7 100644
--- a/paimon-python/pypaimon/tests/sql_context_test.py
+++ b/paimon-python/pypaimon/tests/sql_context_test.py
@@ -20,7 +20,7 @@ import unittest
 
 import pyarrow as pa
 
-from pypaimon import CatalogFactory
+from pypaimon_rust.datafusion import SQLContext
 
 
 WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", "/tmp/paimon-warehouse")
@@ -28,72 +28,31 @@ WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE", 
"/tmp/paimon-warehouse")
 
 class SQLContextTest(unittest.TestCase):
 
-    _table_created = False
-
-    def _create_catalog(self):
-        return CatalogFactory.create({"warehouse": WAREHOUSE})
-
-    def _create_sql_context(self):
-        from pypaimon.sql.sql_context import SQLContext
-        ctx = SQLContext()
-        ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
-        ctx.set_current_catalog("paimon")
-        ctx.set_current_database("default")
-        return ctx
-
     @classmethod
     def setUpClass(cls):
         """Create the test table once before all tests in this class."""
-        from pypaimon import Schema, CatalogFactory
-        from pypaimon.schema.data_types import DataField, AtomicType
-
-        catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
-        try:
-            catalog.create_database("default", ignore_if_exists=True)
-        except Exception:
-            pass
-
-        identifier = "default.sql_test_table"
-
-        # Drop existing table to ensure clean state
-        catalog.drop_table(identifier, ignore_if_not_exists=True)
-
-        schema = Schema(
-            fields=[
-                DataField(0, "id", AtomicType("INT")),
-                DataField(1, "name", AtomicType("STRING")),
-            ],
-            primary_keys=[],
-            partition_keys=[],
-            options={},
-            comment="",
-        )
-        catalog.create_table(identifier, schema, ignore_if_exists=False)
-
-        table = catalog.get_table(identifier)
-        write_builder = table.new_batch_write_builder()
-        table_write = write_builder.new_write()
-        table_commit = write_builder.new_commit()
-        try:
-            pa_table = pa.table({
-                "id": pa.array([1, 2, 3], type=pa.int32()),
-                "name": pa.array(["alice", "bob", "carol"], type=pa.string()),
-            })
-            table_write.write_arrow(pa_table)
-            table_commit.commit(table_write.prepare_commit())
-        finally:
-            table_write.close()
-            table_commit.close()
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+        ctx.sql("DROP TABLE IF EXISTS sql_test_table")
+        ctx.sql("CREATE TABLE sql_test_table (id INT, name STRING)")
+        ctx.sql("INSERT INTO sql_test_table VALUES (1, 'alice'), (2, 'bob'), 
(3, 'carol')")
 
     @classmethod
     def tearDownClass(cls):
         """Clean up the test table after all tests."""
-        catalog = CatalogFactory.create({"warehouse": WAREHOUSE})
-        catalog.drop_table("default.sql_test_table", ignore_if_not_exists=True)
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+        ctx.sql("DROP TABLE IF EXISTS sql_test_table")
+
+    def _create_sql_context(self):
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
+        return ctx
 
     def test_sql_returns_table(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        batches = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        table = pa.Table.from_batches(batches)
         self.assertIsInstance(table, pa.Table)
         self.assertEqual(table.num_rows, 3)
         self.assertEqual(table.column("id").to_pylist(), [1, 2, 3])
@@ -101,52 +60,36 @@ class SQLContextTest(unittest.TestCase):
 
     def test_sql_to_pandas(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        batches = ctx.sql("SELECT id, name FROM sql_test_table ORDER BY id")
+        table = pa.Table.from_batches(batches)
         df = table.to_pandas()
         self.assertEqual(len(df), 3)
         self.assertListEqual(list(df.columns), ["id", "name"])
 
     def test_sql_with_filter(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1 
ORDER BY id")
+        batches = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 1 
ORDER BY id")
+        table = pa.Table.from_batches(batches)
         self.assertEqual(table.num_rows, 2)
         self.assertEqual(table.column("id").to_pylist(), [2, 3])
 
     def test_sql_with_empty_result(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4 
ORDER BY id")
-        self.assertIsInstance(table, pa.Table)
-        self.assertEqual(table.num_rows, 0)
-        self.assertEqual(table.schema.names, ["id", "name"])
+        batches = ctx.sql("SELECT id, name FROM sql_test_table WHERE id > 4 
ORDER BY id")
+        self.assertEqual(len(batches), 0)
 
     def test_sql_with_aggregation(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+        batches = ctx.sql("SELECT count(*) AS cnt FROM sql_test_table")
+        table = pa.Table.from_batches(batches)
         self.assertEqual(table.column("cnt").to_pylist(), [3])
 
     def test_sql_two_part_reference(self):
         ctx = self._create_sql_context()
-        table = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+        batches = ctx.sql("SELECT count(*) AS cnt FROM default.sql_test_table")
+        table = pa.Table.from_batches(batches)
         self.assertEqual(table.column("cnt").to_pylist(), [3])
 
-    def test_import_error_without_pypaimon_rust(self):
-        """register_catalog should raise ImportError when pypaimon-rust is 
missing."""
-        import unittest.mock as mock
-        import builtins
-        original_import = builtins.__import__
-
-        def mock_import(name, *args, **kwargs):
-            if name == "pypaimon_rust.datafusion" or name == "pypaimon_rust":
-                raise ImportError("No module named 'pypaimon_rust'")
-            return original_import(name, *args, **kwargs)
-
-        from pypaimon.sql.sql_context import SQLContext
-        ctx = SQLContext()
-        with mock.patch("builtins.__import__", side_effect=mock_import):
-            with self.assertRaises(ImportError) as cm:
-                ctx.register_catalog("paimon", {"warehouse": WAREHOUSE})
-            self.assertIn("pypaimon-rust", str(cm.exception))
-
 
 if __name__ == "__main__":
     unittest.main()

Reply via email to