This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c3083a6172 [python] Add end-to-end SQL tests for Paimon system tables 
(#7762)
c3083a6172 is described below

commit c3083a6172c97e20f692fb7bb163fd781dc36365
Author: Jiajia Li <[email protected]>
AuthorDate: Wed May 13 19:19:16 2026 +0800

    [python] Add end-to-end SQL tests for Paimon system tables (#7762)
---
 .../pypaimon/tests/sql_system_table_test.py        | 131 +++++++++++++++++++++
 1 file changed, 131 insertions(+)

diff --git a/paimon-python/pypaimon/tests/sql_system_table_test.py 
b/paimon-python/pypaimon/tests/sql_system_table_test.py
new file mode 100644
index 0000000000..e221d0098f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/sql_system_table_test.py
@@ -0,0 +1,131 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+"""End-to-end tests for Paimon system tables via pypaimon SQL.
+
+Exercises the `<table>$<system_name>` syntax handled by paimon-rust
+DataFusion integration. A non-partitioned table with one snapshot is
+created in setUpClass and queried by each test.
+"""
+
+import json
+import os
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon_rust.datafusion import SQLContext
+
+
+WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE")
+TABLE_NAME = "sql_system_test_table"
+ROW_COUNT = 3
+
+
+class SQLSystemTableTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls._tmpdir = None
+        if WAREHOUSE:
+            cls.warehouse = WAREHOUSE
+        else:
+            cls._tmpdir = 
tempfile.TemporaryDirectory(prefix="paimon-sql-systest-")
+            cls.warehouse = cls._tmpdir.name
+
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
+        ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
+        ctx.sql(f"CREATE TABLE {TABLE_NAME} (id INT, name STRING)")
+        ctx.sql(
+            f"INSERT INTO {TABLE_NAME} VALUES (1, 'alice'), (2, 'bob'), (3, 
'carol')"
+        )
+        cls.ctx = ctx
+
+    @classmethod
+    def tearDownClass(cls):
+        ctx = SQLContext()
+        ctx.register_catalog("paimon", {"warehouse": cls.warehouse})
+        ctx.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
+        if cls._tmpdir is not None:
+            cls._tmpdir.cleanup()
+
+    def _query(self, system_name: str) -> pa.Table:
+        batches = self.ctx.sql(f"SELECT * FROM {TABLE_NAME}${system_name}")
+        return pa.Table.from_batches(batches)
+
+    def test_options_system_table(self):
+        table = self._query("options")
+        self.assertListEqual(table.schema.names, ["key", "value"])
+
+    def test_schemas_system_table(self):
+        table = self._query("schemas")
+        self.assertListEqual(
+            table.schema.names,
+            ["schema_id", "fields", "partition_keys", "primary_keys",
+             "options", "comment", "update_time"],
+        )
+        self.assertGreaterEqual(table.num_rows, 1, "should have at least one 
schema")
+        ids = table.column("schema_id").to_pylist()
+        self.assertEqual(sorted(ids), sorted(set(ids)), "schema_id should be 
unique")
+        fields = json.loads(table.column("fields").to_pylist()[0])
+        self.assertEqual([f["name"] for f in fields], ["id", "name"])
+
+    def test_snapshots_system_table(self):
+        table = self._query("snapshots")
+        names = table.schema.names
+        for required in (
+            "snapshot_id", "schema_id", "commit_user", "commit_identifier",
+            "commit_kind", "commit_time", "base_manifest_list",
+            "delta_manifest_list", "total_record_count",
+        ):
+            self.assertIn(required, names)
+        self.assertEqual(table.num_rows, 1, "single batch write should produce 
one snapshot")
+        self.assertEqual(table.column("total_record_count").to_pylist()[0], 
ROW_COUNT)
+
+    def test_tags_system_table_empty(self):
+        table = self._query("tags")
+        self.assertListEqual(
+            table.schema.names,
+            ["tag_name", "snapshot_id", "schema_id", "commit_time",
+             "record_count", "create_time", "time_retained"],
+        )
+        self.assertEqual(table.num_rows, 0, "no tags created")
+
+    def test_branches_system_table_empty(self):
+        table = self._query("branches")
+        self.assertListEqual(table.schema.names, ["branch_name", 
"create_time"])
+        non_main = [b for b in table.column("branch_name").to_pylist() if b != 
"main"]
+        self.assertEqual(non_main, [], "no user-created branches")
+
+    def test_manifests_system_table(self):
+        table = self._query("manifests")
+        for required in (
+            "file_name", "file_size", "num_added_files",
+            "num_deleted_files", "schema_id",
+        ):
+            self.assertIn(required, table.schema.names)
+        self.assertGreaterEqual(table.num_rows, 1, "snapshot should have 
manifests")
+        for size in table.column("file_size").to_pylist():
+            self.assertGreater(size, 0)
+        total_added = sum(table.column("num_added_files").to_pylist())
+        self.assertGreaterEqual(total_added, 1, "single write should add at 
least one file")
+
+
+if __name__ == "__main__":
+    unittest.main()

Reply via email to