This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eb9011fc8 [python] Support JDBC catalog (#7720)
0eb9011fc8 is described below

commit 0eb9011fc824d39b9827f994a561c52707f62161
Author: Colin <[email protected]>
AuthorDate: Thu Jun 4 13:33:07 2026 +0800

    [python] Support JDBC catalog (#7720)
    
    Support JDBC catalog in PyPaimon. This adds a Python JDBC catalog
    implementation that uses the same catalog metadata tables as Java Paimon
    JDBC catalog: `paimon_tables`, `paimon_database_properties`, and
    `paimon_table_properties`.
    
    The implementation supports SQLite with the Python standard library and
    dynamically supports MySQL/PostgreSQL when a corresponding Python DB-API
    driver is installed. Table data and schema files continue to use
    existing PyPaimon `FileIO` and `SchemaManager` behavior.
---
 docs/docs/pypaimon/python-api.mdx                  |  42 +-
 paimon-python/pypaimon/catalog/catalog_factory.py  |   8 +-
 paimon-python/pypaimon/catalog/jdbc_catalog.py     | 622 +++++++++++++++++++++
 .../pypaimon/catalog/jdbc_catalog_loader.py        |  32 ++
 paimon-python/pypaimon/common/options/config.py    |   7 +
 paimon-python/pypaimon/tests/jdbc_catalog_test.py  | 223 ++++++++
 6 files changed, 930 insertions(+), 4 deletions(-)

diff --git a/docs/docs/pypaimon/python-api.mdx 
b/docs/docs/pypaimon/python-api.mdx
index 52afbc2676..c1887c537c 100644
--- a/docs/docs/pypaimon/python-api.mdx
+++ b/docs/docs/pypaimon/python-api.mdx
@@ -103,6 +103,46 @@ catalog_options = {
 
 </TabItem>
 
+<TabItem value="jdbc-catalog" label="jdbc catalog">
+
+PyPaimon keeps the catalog type `jdbc` for compatibility with Paimon catalog 
options,
+but connects with native Python DB-API drivers instead of JVM JDBC drivers.
+
+```python
+from pypaimon import CatalogFactory
+
+# Note that keys and values are all string
+catalog_options = {
+  'metastore': 'jdbc',
+  'warehouse': 'file:///path/to/warehouse',
+  'uri': 'jdbc:sqlite:/path/to/catalog.db',
+  # Optional. Defaults to 'jdbc'.
+  'catalog-key': 'jdbc',
+}
+catalog = CatalogFactory.create(catalog_options)
+```
+
+For MySQL or PostgreSQL, install the corresponding Python DB-API driver and 
use the same Paimon
+JDBC catalog options:
+
+```python
+catalog_options = {
+  'metastore': 'jdbc',
+  'warehouse': 's3://bucket/path/to/warehouse',
+  'uri': 'jdbc:mysql://<host>:<port>/<databaseName>',
+  'jdbc.user': '...',
+  'jdbc.password': '...',
+  'catalog-key': 'jdbc',
+}
+```
+
+Unlike Flink or Spark, PyPaimon does not use JVM JDBC drivers or load JDBC 
connector jars.
+It keeps the `metastore='jdbc'` and `jdbc:` URI format for compatibility with 
Paimon's
+JDBC catalog configuration, but the database connection is created through 
native Python DB-API
+drivers such as `pymysql`, `mysql-connector-python`, `psycopg2`, or `psycopg`.
+
+</TabItem>
+
 <TabItem value="rest-catalog" label="rest catalog">
 
 The sample code is as follows. The detailed meaning of option can be found in 
[REST](../concepts/rest/).
@@ -124,7 +164,7 @@ catalog = CatalogFactory.create(catalog_options)
 
 </Tabs>
 
-Currently, PyPaimon only support filesystem catalog and rest catalog. See 
[Catalog](../concepts/catalog).
+Currently, PyPaimon supports filesystem catalog, jdbc catalog and rest 
catalog. See [Catalog](../concepts/catalog).
 
 You can use the catalog to create table for writing data.
 
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/catalog/catalog_factory.py
index 741d785d29..117c10b2e9 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -21,6 +21,7 @@ from pypaimon.common.options import Options
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog
 from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.common.options.config import CatalogOptions
 
@@ -29,6 +30,7 @@ class CatalogFactory:
 
     CATALOG_REGISTRY = {
         "filesystem": FileSystemCatalog,
+        "jdbc": JdbcCatalog,
         "rest": RESTCatalog,
     }
 
@@ -39,6 +41,6 @@ class CatalogFactory:
         if catalog_class is None:
             raise ValueError("Unknown catalog identifier: {}. "
                              "Available types: {}".format(identifier, 
list(CatalogFactory.CATALOG_REGISTRY.keys())))
-        return catalog_class(
-            CatalogContext.create_from_options(Options(catalog_options))) if 
identifier == "rest" else catalog_class(
-                Options(catalog_options))
+        if identifier in ("jdbc", "rest"):
+            return 
catalog_class(CatalogContext.create_from_options(Options(catalog_options)))
+        return catalog_class(Options(catalog_options))
diff --git a/paimon-python/pypaimon/catalog/jdbc_catalog.py 
b/paimon-python/pypaimon/catalog/jdbc_catalog.py
new file mode 100644
index 0000000000..ccf5dbfa1a
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/jdbc_catalog.py
@@ -0,0 +1,622 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import sqlite3
+from contextlib import closing, contextmanager
+from typing import Dict, List, Optional, Tuple, Union
+from urllib.parse import parse_qs, urlparse
+
+from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_environment import CatalogEnvironment
+from pypaimon.catalog.catalog_exception import (
+    DatabaseAlreadyExistException,
+    DatabaseNotExistException,
+    TableAlreadyExistException,
+    TableNotExistException
+)
+from pypaimon.catalog.database import Database
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options.config import CatalogOptions, JdbcCatalogOptions
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
+from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.table import Table
+
+
+def _convert_qmark_placeholders(sql: str, placeholder: str) -> str:
+    if placeholder == "?":
+        return sql
+
+    result = []
+    in_single_quote = False
+    in_double_quote = False
+    index = 0
+    while index < len(sql):
+        char = sql[index]
+        if char == "'" and not in_double_quote:
+            result.append(char)
+            if in_single_quote and index + 1 < len(sql) and sql[index + 1] == 
"'":
+                index += 1
+                result.append(sql[index])
+            else:
+                in_single_quote = not in_single_quote
+        elif char == '"' and not in_single_quote:
+            result.append(char)
+            if in_double_quote and index + 1 < len(sql) and sql[index + 1] == 
'"':
+                index += 1
+                result.append(sql[index])
+            else:
+                in_double_quote = not in_double_quote
+        elif char == "?" and not in_single_quote and not in_double_quote:
+            result.append(placeholder)
+        else:
+            result.append(char)
+        index += 1
+    return "".join(result)
+
+
+class _DbApiConnection:
+    def __init__(self, options: Dict[str, str]):
+        self.options = options
+        self.uri = options.get(CatalogOptions.URI.key())
+        if not self.uri:
+            raise ValueError(f"Paimon '{CatalogOptions.URI.key()}' must be set 
for JDBC catalog")
+        self.protocol, self.placeholder, self.connection = 
self._connect(self.uri, options)
+
+    def close(self):
+        self.connection.close()
+
+    def execute(self, sql: str, args: Tuple = ()):
+        with closing(self.connection.cursor()) as cursor:
+            cursor.execute(self._sql(sql), args)
+            return cursor.rowcount
+
+    def executemany(self, sql: str, args):
+        with closing(self.connection.cursor()) as cursor:
+            cursor.executemany(self._sql(sql), args)
+            return cursor.rowcount
+
+    def fetch_all(self, sql: str, args: Tuple = ()):
+        with closing(self.connection.cursor()) as cursor:
+            cursor.execute(self._sql(sql), args)
+            return cursor.fetchall()
+
+    def fetch_one(self, sql: str, args: Tuple = ()):
+        with closing(self.connection.cursor()) as cursor:
+            cursor.execute(self._sql(sql), args)
+            return cursor.fetchone()
+
+    def _sql(self, sql: str) -> str:
+        return _convert_qmark_placeholders(sql, self.placeholder)
+
+    @contextmanager
+    def transaction(self):
+        try:
+            yield
+            self.connection.commit()
+        except Exception:
+            self.connection.rollback()
+            raise
+
+    @staticmethod
+    def _jdbc_properties(options: Dict[str, str]) -> Dict[str, str]:
+        result = {}
+        for key, value in options.items():
+            if key.startswith("jdbc."):
+                result[key[len("jdbc."):]] = value
+        return result
+
+    def _connect(self, uri: str, options: Dict[str, str]):
+        if uri.startswith("jdbc:sqlite:"):
+            return self._connect_sqlite(uri)
+        if uri.startswith("jdbc:mysql:"):
+            return self._connect_mysql(uri, options)
+        if uri.startswith("jdbc:postgresql:"):
+            return self._connect_postgresql(uri, options)
+        raise ValueError(f"Unsupported JDBC catalog URI for Python DB-API 
connection: {uri}")
+
+    def _connect_sqlite(self, uri: str):
+        sqlite_uri = uri[len("jdbc:sqlite:"):]
+        if sqlite_uri.startswith("file:"):
+            connection = sqlite3.connect(sqlite_uri, uri=True, 
check_same_thread=False)
+        else:
+            connection = sqlite3.connect(sqlite_uri, check_same_thread=False)
+        return "sqlite", "?", connection
+
+    def _connect_mysql(self, uri: str, options: Dict[str, str]):
+        try:
+            import pymysql
+            connector = "pymysql"
+        except ImportError:
+            try:
+                import mysql.connector as mysql_connector
+                connector = "mysql-connector"
+            except ImportError as e:
+                raise ImportError(
+                    "PyPaimon JDBC catalog uses Python DB-API drivers and 
requires "
+                    "pymysql or mysql-connector-python to connect to MySQL."
+                ) from e
+
+        parsed = urlparse(uri[len("jdbc:"):])
+        props = self._jdbc_properties(options)
+        query = {k: v[0] for k, v in parse_qs(parsed.query).items()}
+        props.update(query)
+        user = props.pop("user", props.pop("username", None))
+        password = props.pop("password", None)
+        host = props.pop("host", parsed.hostname)
+        database = parsed.path.lstrip("/") or props.pop("database", "")
+        props.pop("database", None)
+        port_value = props.pop("port", None)
+        port = parsed.port or int(port_value or 3306)
+        if connector == "pymysql":
+            connection = pymysql.connect(
+                host=host,
+                port=port,
+                user=user,
+                password=password,
+                database=database,
+                autocommit=False,
+                **props
+            )
+        else:
+            connection = mysql_connector.connect(
+                host=host,
+                port=port,
+                user=user,
+                password=password,
+                database=database,
+                **props
+            )
+        return "mysql", "%s", connection
+
+    def _connect_postgresql(self, uri: str, options: Dict[str, str]):
+        try:
+            import psycopg2
+            connector = "psycopg2"
+        except ImportError:
+            try:
+                import psycopg
+                connector = "psycopg"
+            except ImportError as e:
+                raise ImportError(
+                    "PyPaimon JDBC catalog uses Python DB-API drivers and 
requires "
+                    "psycopg2 or psycopg to connect to PostgreSQL."
+                ) from e
+
+        parsed = urlparse(uri[len("jdbc:"):])
+        props = self._jdbc_properties(options)
+        query = {k: v[0] for k, v in parse_qs(parsed.query).items()}
+        props.update(query)
+        user = props.pop("user", props.pop("username", None))
+        password = props.pop("password", None)
+        host = props.pop("host", parsed.hostname)
+        database = parsed.path.lstrip("/") or props.get("database") or 
props.get("dbname") or ""
+        props.pop("database", None)
+        props.pop("dbname", None)
+        port_value = props.pop("port", None)
+        port = parsed.port or int(port_value or 5432)
+        connect_kwargs = {
+            "host": host,
+            "port": port,
+            "user": user,
+            "password": password,
+            "dbname": database,
+        }
+        connect_kwargs.update(props)
+        if connector == "psycopg2":
+            connection = psycopg2.connect(**connect_kwargs)
+        else:
+            connection = psycopg.connect(**connect_kwargs)
+        return "postgresql", "%s", connection
+
+
+class JdbcCatalog(Catalog):
+    CATALOG_TABLE_NAME = "paimon_tables"
+    DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"
+    TABLE_PROPERTIES_TABLE_NAME = "paimon_table_properties"
+    CATALOG_KEY = "catalog_key"
+    TABLE_DATABASE = "database_name"
+    TABLE_NAME = "table_name"
+    PROPERTY_KEY = "property_key"
+    PROPERTY_VALUE = "property_value"
+    DATABASE_EXISTS_PROPERTY = "exists"
+
+    def __init__(self, context: CatalogContext):
+        catalog_options = context.options
+        if not catalog_options.contains(CatalogOptions.WAREHOUSE):
+            raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path 
must be set")
+        self.context = context
+        self.catalog_options = catalog_options
+        self.options = catalog_options.to_map()
+        self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
+        self.catalog_key = catalog_options.get(JdbcCatalogOptions.CATALOG_KEY)
+        self.file_io = FileIO.get(self.warehouse, self.catalog_options)
+        self.connection = _DbApiConnection(self.options)
+        self._initialize_catalog_tables()
+
+    def close(self):
+        self.connection.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.close()
+        return False
+
+    def _initialize_catalog_tables(self):
+        with self.connection.transaction():
+            self.connection.execute(
+                "CREATE TABLE IF NOT EXISTS paimon_tables ("
+                "catalog_key VARCHAR(255) NOT NULL, "
+                "database_name VARCHAR(255) NOT NULL, "
+                "table_name VARCHAR(255) NOT NULL, "
+                "PRIMARY KEY (catalog_key, database_name, table_name))"
+            )
+            self.connection.execute(
+                "CREATE TABLE IF NOT EXISTS paimon_database_properties ("
+                "catalog_key VARCHAR(255) NOT NULL, "
+                "database_name VARCHAR(255) NOT NULL, "
+                "property_key VARCHAR(255), "
+                "property_value VARCHAR(1000), "
+                "PRIMARY KEY (catalog_key, database_name, property_key))"
+            )
+            self.connection.execute(
+                "CREATE TABLE IF NOT EXISTS paimon_table_properties ("
+                "catalog_key VARCHAR(255) NOT NULL, "
+                "database_name VARCHAR(255) NOT NULL, "
+                "table_name VARCHAR(255) NOT NULL, "
+                "property_key VARCHAR(255) NOT NULL, "
+                "property_value VARCHAR(1000), "
+                "PRIMARY KEY (catalog_key, database_name, table_name, 
property_key))"
+            )
+
+    def list_databases(self) -> List[str]:
+        table_rows = self.connection.fetch_all(
+            "SELECT DISTINCT database_name FROM paimon_tables WHERE 
catalog_key = ?",
+            (self.catalog_key,)
+        )
+        property_rows = self.connection.fetch_all(
+            "SELECT DISTINCT database_name FROM paimon_database_properties 
WHERE catalog_key = ?",
+            (self.catalog_key,)
+        )
+        databases = {row[0] for row in table_rows}
+        databases.update(row[0] for row in property_rows)
+        return sorted(databases)
+
+    def get_database(self, name: str) -> Database:
+        if not self._database_exists(name):
+            raise DatabaseNotExistException(name)
+        properties = self._fetch_database_properties(name)
+        if Catalog.DB_LOCATION_PROP not in properties:
+            properties[Catalog.DB_LOCATION_PROP] = self.get_database_path(name)
+        properties.pop(self.DATABASE_EXISTS_PROPERTY, None)
+        return Database(name, properties)
+
+    def create_database(self, name: str, ignore_if_exists: bool, properties: 
Optional[dict] = None):
+        if self._database_exists(name):
+            if not ignore_if_exists:
+                raise DatabaseAlreadyExistException(name)
+            return
+        create_props = {self.DATABASE_EXISTS_PROPERTY: "true"}
+        if properties:
+            create_props.update(properties)
+        if Catalog.DB_LOCATION_PROP not in create_props:
+            create_props[Catalog.DB_LOCATION_PROP] = 
self.get_database_path(name)
+        with self.connection.transaction():
+            self._insert_database_properties(name, create_props)
+
+    def drop_database(self, name: str, ignore_if_not_exists: bool = False, 
cascade: bool = False):
+        if not self._database_exists(name):
+            if not ignore_if_not_exists:
+                raise DatabaseNotExistException(name)
+            return
+        tables = self.list_tables(name)
+        if tables and not cascade:
+            raise ValueError(f"Database {name} is not empty. Use cascade=True 
to drop all tables first.")
+        if cascade:
+            for table in tables:
+                self.drop_table(Identifier.create(name, table), True)
+        with self.connection.transaction():
+            self.connection.execute(
+                "DELETE FROM paimon_tables WHERE catalog_key = ? AND 
database_name = ?",
+                (self.catalog_key, name)
+            )
+            self.connection.execute(
+                "DELETE FROM paimon_database_properties WHERE catalog_key = ? 
AND database_name = ?",
+                (self.catalog_key, name)
+            )
+            self.connection.execute(
+                "DELETE FROM paimon_table_properties WHERE catalog_key = ? AND 
database_name = ?",
+                (self.catalog_key, name)
+            )
+
+    def alter_database(self, name: str, changes: list):
+        self.get_database(name)
+        from pypaimon.catalog.rest.property_change import PropertyChange
+        set_properties, remove_keys = 
PropertyChange.get_set_properties_to_remove_keys(changes)
+        current = self._fetch_database_properties(name)
+        with self.connection.transaction():
+            update_args = [
+                (value, self.catalog_key, name, key)
+                for key, value in set_properties.items()
+                if key in current
+            ]
+            insert_properties = {
+                key: value for key, value in set_properties.items() if key not 
in current
+            }
+            if update_args:
+                self.connection.executemany(
+                    "UPDATE paimon_database_properties SET property_value = ? "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
property_key = ?",
+                    update_args
+                )
+            if insert_properties:
+                self._insert_database_properties(name, insert_properties)
+            for key in remove_keys:
+                self.connection.execute(
+                    "DELETE FROM paimon_database_properties "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
property_key = ?",
+                    (self.catalog_key, name, key)
+                )
+
+    def list_tables(self, database_name: str) -> List[str]:
+        self.get_database(database_name)
+        rows = self.connection.fetch_all(
+            "SELECT table_name FROM paimon_tables WHERE catalog_key = ? AND 
database_name = ?",
+            (self.catalog_key, database_name)
+        )
+        return sorted(row[0] for row in rows)
+
+    def get_table(self, identifier: Union[str, Identifier]) -> Table:
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        if self.catalog_options.contains(CoreOptions.SCAN_FALLBACK_BRANCH):
+            raise ValueError(f"Unsupported CoreOption 
{CoreOptions.SCAN_FALLBACK_BRANCH}")
+        if not self._table_exists(identifier):
+            raise TableNotExistException(identifier)
+        table_path = self.get_table_path(identifier)
+        table_schema = self.get_table_schema(identifier)
+        from pypaimon.catalog.jdbc_catalog_loader import JdbcCatalogLoader
+        catalog_environment = CatalogEnvironment(
+            identifier=identifier,
+            uuid=None,
+            catalog_loader=JdbcCatalogLoader(self.context),
+            supports_version_management=False
+        )
+        return FileStoreTable(self.file_io, identifier, table_path, 
table_schema, catalog_environment)
+
+    def create_table(self, identifier: Union[str, Identifier], schema: 
'Schema', ignore_if_exists: bool):
+        if schema.options and 
schema.options.get(CoreOptions.AUTO_CREATE.key()):
+            raise ValueError(f"The value of {CoreOptions.AUTO_CREATE.key()} 
property should be False.")
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        self.get_database(identifier.get_database_name())
+        if self._table_exists(identifier):
+            if not ignore_if_exists:
+                raise TableAlreadyExistException(identifier)
+            return
+        if schema.options and CoreOptions.TYPE.key() in schema.options and 
schema.options.get(
+                CoreOptions.TYPE.key()) != "table":
+            raise ValueError(f"Table Type: 
{schema.options.get(CoreOptions.TYPE.key())}")
+
+        table_path = self.get_table_path(identifier)
+        schema_manager = SchemaManager(self.file_io, table_path)
+        table_schema = schema_manager.create_table(schema)
+        try:
+            with self.connection.transaction():
+                self.connection.execute(
+                    "INSERT INTO paimon_tables (catalog_key, database_name, 
table_name) VALUES (?, ?, ?)",
+                    (self.catalog_key, identifier.get_database_name(), 
identifier.get_table_name())
+                )
+                if self._sync_all_properties():
+                    self._insert_table_properties(identifier, 
self._collect_table_properties(table_schema))
+        except Exception:
+            self.file_io.delete_directory_quietly(table_path)
+            raise
+
+    def drop_table(self, identifier: Union[str, Identifier], 
ignore_if_not_exists: bool = False):
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        if not self._table_exists(identifier):
+            if not ignore_if_not_exists:
+                raise TableNotExistException(identifier)
+            return
+        table_path = self.get_table_path(identifier)
+        with self.connection.transaction():
+            self.connection.execute(
+                "DELETE FROM paimon_tables WHERE catalog_key = ? AND 
database_name = ? AND table_name = ?",
+                (self.catalog_key, identifier.get_database_name(), 
identifier.get_table_name())
+            )
+            self.connection.execute(
+                "DELETE FROM paimon_table_properties WHERE catalog_key = ? AND 
database_name = ? AND table_name = ?",
+                (self.catalog_key, identifier.get_database_name(), 
identifier.get_table_name())
+            )
+        self.file_io.delete_directory_quietly(table_path)
+
+    def rename_table(self, source_identifier: Union[str, Identifier], 
target_identifier: Union[str, Identifier]):
+        if not isinstance(source_identifier, Identifier):
+            source_identifier = Identifier.from_string(source_identifier)
+        if not isinstance(target_identifier, Identifier):
+            target_identifier = Identifier.from_string(target_identifier)
+        if not self._table_exists(source_identifier):
+            raise TableNotExistException(source_identifier)
+        self.get_database(target_identifier.get_database_name())
+        if self._table_exists(target_identifier):
+            raise TableAlreadyExistException(target_identifier)
+
+        source_path = self.get_table_path(source_identifier)
+        target_path = self.get_table_path(target_identifier)
+        renamed_path = False
+        if self.file_io.exists(source_path):
+            self.file_io.rename(source_path, target_path)
+            renamed_path = True
+        try:
+            with self.connection.transaction():
+                self.connection.execute(
+                    "UPDATE paimon_tables SET database_name = ?, table_name = 
? "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
table_name = ?",
+                    (
+                        target_identifier.get_database_name(),
+                        target_identifier.get_table_name(),
+                        self.catalog_key,
+                        source_identifier.get_database_name(),
+                        source_identifier.get_table_name()
+                    )
+                )
+                self.connection.execute(
+                    "UPDATE paimon_table_properties SET database_name = ?, 
table_name = ? "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
table_name = ?",
+                    (
+                        target_identifier.get_database_name(),
+                        target_identifier.get_table_name(),
+                        self.catalog_key,
+                        source_identifier.get_database_name(),
+                        source_identifier.get_table_name()
+                    )
+                )
+        except Exception:
+            if renamed_path and self.file_io.exists(target_path):
+                self.file_io.rename(target_path, source_path)
+            raise
+
+    def alter_table(
+        self,
+        identifier: Union[str, Identifier],
+        changes: List[SchemaChange],
+        ignore_if_not_exists: bool = False
+    ):
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        if not self._table_exists(identifier):
+            if not ignore_if_not_exists:
+                raise TableNotExistException(identifier)
+            return
+        schema_manager = SchemaManager(self.file_io, 
self.get_table_path(identifier))
+        table_schema = schema_manager.commit_changes(changes)
+        if self._sync_all_properties():
+            with self.connection.transaction():
+                self.connection.execute(
+                    "DELETE FROM paimon_table_properties "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
table_name = ?",
+                    (self.catalog_key, identifier.get_database_name(), 
identifier.get_table_name())
+                )
+                self._insert_table_properties(identifier, 
self._collect_table_properties(table_schema))
+
+    def get_table_schema(self, identifier: Identifier):
+        table_schema = SchemaManager(self.file_io, 
self.get_table_path(identifier)).latest()
+        if table_schema is None:
+            raise TableNotExistException(identifier)
+        return table_schema
+
+    def get_database_path(self, name: str) -> str:
+        warehouse = self.warehouse.rstrip('/')
+        return f"{warehouse}/{name}{Catalog.DB_SUFFIX}"
+
+    def get_table_path(self, identifier: Identifier) -> str:
+        db_path = self.get_database_path(identifier.get_database_name())
+        return f"{db_path}/{identifier.get_table_name()}"
+
+    def load_snapshot(self, identifier: Identifier):
+        raise NotImplementedError("JDBC catalog does not support 
load_snapshot")
+
+    def commit_snapshot(
+            self,
+            identifier: Identifier,
+            table_uuid: Optional[str],
+            snapshot: Snapshot,
+            statistics: List[PartitionStatistics]
+    ) -> bool:
+        raise NotImplementedError("This catalog does not support commit 
catalog")
+
+    def _database_exists(self, database_name: str) -> bool:
+        row = self.connection.fetch_one(
+            "SELECT database_name FROM paimon_tables "
+            "WHERE catalog_key = ? AND database_name = ? LIMIT 1",
+            (self.catalog_key, database_name)
+        )
+        if row is not None:
+            return True
+        row = self.connection.fetch_one(
+            "SELECT database_name FROM paimon_database_properties "
+            "WHERE catalog_key = ? AND database_name = ? LIMIT 1",
+            (self.catalog_key, database_name)
+        )
+        return row is not None
+
+    def _table_exists(self, identifier: Identifier) -> bool:
+        row = self.connection.fetch_one(
+            "SELECT table_name FROM paimon_tables "
+            "WHERE catalog_key = ? AND database_name = ? AND table_name = ? 
LIMIT 1",
+            (self.catalog_key, identifier.get_database_name(), 
identifier.get_table_name())
+        )
+        return row is not None
+
+    def _fetch_database_properties(self, database_name: str) -> Dict[str, str]:
+        rows = self.connection.fetch_all(
+            "SELECT property_key, property_value FROM 
paimon_database_properties "
+            "WHERE catalog_key = ? AND database_name = ?",
+            (self.catalog_key, database_name)
+        )
+        return {row[0]: row[1] for row in rows}
+
+    def _insert_database_properties(self, database_name: str, properties: 
Dict[str, str]):
+        if properties:
+            self.connection.executemany(
+                "INSERT INTO paimon_database_properties "
+                "(catalog_key, database_name, property_key, property_value) 
VALUES (?, ?, ?, ?)",
+                [(self.catalog_key, database_name, key, value) for key, value 
in properties.items()]
+            )
+
+    def _insert_table_properties(self, identifier: Identifier, properties: 
Dict[str, str]):
+        if properties:
+            self.connection.executemany(
+                "INSERT INTO paimon_table_properties "
+                "(catalog_key, database_name, table_name, property_key, 
property_value) "
+                "VALUES (?, ?, ?, ?, ?)",
+                [
+                    (
+                        self.catalog_key,
+                        identifier.get_database_name(),
+                        identifier.get_table_name(),
+                        key,
+                        value
+                    )
+                    for key, value in properties.items()
+                ]
+            )
+
+    def _sync_all_properties(self) -> bool:
+        from pypaimon.common.options.options_utils import OptionsUtils
+        return OptionsUtils.convert_to_boolean(
+            self.catalog_options.get(CatalogOptions.SYNC_ALL_PROPERTIES))
+
+    @staticmethod
+    def _collect_table_properties(table_schema) -> Dict[str, str]:
+        properties = dict(table_schema.options or {})
+        if table_schema.primary_keys:
+            properties["primary-key"] = ",".join(table_schema.primary_keys)
+        if table_schema.partition_keys:
+            properties["partition"] = ",".join(table_schema.partition_keys)
+        return properties
diff --git a/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py 
b/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py
new file mode 100644
index 0000000000..801ecb1a1c
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog
+
+
+class JdbcCatalogLoader(CatalogLoader):
+    def __init__(self, context: CatalogContext):
+        self._context = context
+
+    def context(self) -> CatalogContext:
+        return self._context
+
+    def load(self) -> JdbcCatalog:
+        return JdbcCatalog(self._context)
diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index 6189ebaf89..0004cb896a 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -59,6 +59,11 @@ class GcsOptions:
         .with_description("GCP project ID for GCS requests."))
 
 
+class JdbcCatalogOptions:
+    CATALOG_KEY = 
ConfigOptions.key("catalog-key").string_type().default_value("jdbc").with_description(
+        "Custom JDBC catalog store key.")
+
+
 class PVFSOptions:
     CACHE_ENABLED = 
ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description(
         "Enable cache")
@@ -101,6 +106,8 @@ class CatalogOptions:
     PREFIX = 
ConfigOptions.key("prefix").string_type().no_default_value().with_description("Prefix")
     HTTP_USER_AGENT_HEADER = ConfigOptions.key(
         
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP
 User Agent header")
+    SYNC_ALL_PROPERTIES = 
ConfigOptions.key("sync-all-properties").boolean_type().default_value(True).with_description(
+        "Sync all table properties to the catalog metastore")
     BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
 
 
diff --git a/paimon-python/pypaimon/tests/jdbc_catalog_test.py 
b/paimon-python/pypaimon/tests/jdbc_catalog_test.py
new file mode 100644
index 0000000000..04ca55e250
--- /dev/null
+++ b/paimon-python/pypaimon/tests/jdbc_catalog_test.py
@@ -0,0 +1,223 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import os
+import shutil
+import sqlite3
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.catalog.catalog_exception import (
+    DatabaseAlreadyExistException,
+    DatabaseNotExistException,
+    TableAlreadyExistException,
+    TableNotExistException
+)
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog, 
_convert_qmark_placeholders
+from pypaimon.catalog.rest.property_change import PropertyChange
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
+
+
+class JdbcCatalogTest(unittest.TestCase):
+
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+        self.warehouse = os.path.join(self.temp_dir, "warehouse")
+        self.jdbc_path = os.path.join(self.temp_dir, "catalog.db")
+        self.options = {
+            "metastore": "jdbc",
+            "warehouse": self.warehouse,
+            "uri": "jdbc:sqlite:" + self.jdbc_path,
+            "catalog-key": "test-jdbc-catalog",
+        }
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_create_jdbc_catalog(self):
+        catalog = CatalogFactory.create(self.options)
+        self.assertTrue(isinstance(catalog, JdbcCatalog))
+
+        with sqlite3.connect(self.jdbc_path) as conn:
+            tables = {
+                row[0]
+                for row in conn.execute(
+                    "SELECT name FROM sqlite_master WHERE type = 'table'"
+                )
+            }
+        self.assertIn("paimon_tables", tables)
+        self.assertIn("paimon_database_properties", tables)
+        self.assertIn("paimon_table_properties", tables)
+
+    def test_jdbc_catalog_context_manager_closes_connection(self):
+        with CatalogFactory.create(self.options) as catalog:
+            self.assertTrue(isinstance(catalog, JdbcCatalog))
+
+        with self.assertRaises(sqlite3.ProgrammingError):
+            catalog.list_databases()
+
+    def test_placeholder_conversion_skips_string_literals(self):
+        sql = "SELECT '?' AS q, \"?\" AS quoted, col FROM tbl WHERE a = ? AND 
b = '?'"
+        self.assertEqual(
+            _convert_qmark_placeholders(sql, "%s"),
+            "SELECT '?' AS q, \"?\" AS quoted, col FROM tbl WHERE a = %s AND b 
= '?'"
+        )
+
+    def test_database(self):
+        catalog = CatalogFactory.create(self.options)
+        catalog.create_database("test_db", False, {"owner": "owner1"})
+
+        with self.assertRaises(DatabaseAlreadyExistException):
+            catalog.create_database("test_db", False)
+
+        self.assertEqual(catalog.list_databases(), ["test_db"])
+        database = catalog.get_database("test_db")
+        self.assertEqual(database.name, "test_db")
+        self.assertEqual(database.options["owner"], "owner1")
+        self.assertEqual(
+            database.options["location"],
+            os.path.join(self.warehouse, "test_db.db")
+        )
+
+        reloaded = CatalogFactory.create(self.options)
+        self.assertEqual(reloaded.list_databases(), ["test_db"])
+        reloaded.alter_database(
+            "test_db",
+            [
+                PropertyChange.set_property("comment", "new comment"),
+                PropertyChange.remove_property("owner"),
+            ]
+        )
+        updated = reloaded.get_database("test_db")
+        self.assertEqual(updated.options["comment"], "new comment")
+        self.assertNotIn("owner", updated.options)
+
+        reloaded.drop_database("test_db")
+        with self.assertRaises(DatabaseNotExistException):
+            reloaded.get_database("test_db")
+
+    def test_table(self):
+        fields = [
+            DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+            DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+        ]
+        catalog = CatalogFactory.create(self.options)
+        catalog.create_database("test_db", False)
+        catalog.create_table(
+            "test_db.test_table",
+            Schema(fields=fields, partition_keys=["f1"], options={"bucket": 
"1"}),
+            False
+        )
+
+        with self.assertRaises(TableAlreadyExistException):
+            catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+
+        self.assertEqual(catalog.list_tables("test_db"), ["test_table"])
+        self.assertTrue(
+            os.path.exists(
+                os.path.join(self.warehouse, "test_db.db", "test_table", 
"schema", "schema-0")
+            )
+        )
+
+        reloaded = CatalogFactory.create(self.options)
+        table = reloaded.get_table("test_db.test_table")
+        self.assertEqual(table.fields[0].name, "f0")
+        self.assertTrue(isinstance(table.fields[0].type, AtomicType))
+        self.assertEqual(table.fields[0].type.type, "INT")
+
+        with sqlite3.connect(self.jdbc_path) as conn:
+            properties = dict(
+                conn.execute(
+                    "SELECT property_key, property_value FROM 
paimon_table_properties "
+                    "WHERE catalog_key = ? AND database_name = ? AND 
table_name = ?",
+                    ("test-jdbc-catalog", "test_db", "test_table")
+                ).fetchall()
+            )
+        self.assertEqual(properties["bucket"], "1")
+        self.assertEqual(properties["partition"], "f1")
+
+        reloaded.alter_table(
+            "test_db.test_table",
+            [SchemaChange.add_column("f2", AtomicType("BIGINT"))]
+        )
+        self.assertEqual(len(reloaded.get_table("test_db.test_table").fields), 
3)
+
+        reloaded.rename_table("test_db.test_table", "test_db.renamed_table")
+        self.assertEqual(reloaded.list_tables("test_db"), ["renamed_table"])
+        with self.assertRaises(TableNotExistException):
+            reloaded.get_table("test_db.test_table")
+
+        reloaded.drop_table("test_db.renamed_table")
+        self.assertEqual(reloaded.list_tables("test_db"), [])
+        with self.assertRaises(TableNotExistException):
+            reloaded.get_table("test_db.renamed_table")
+
+    def test_create_table_rolls_back_metadata_on_failure(self):
+        fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+        catalog = CatalogFactory.create(self.options)
+        catalog.create_database("test_db", False)
+
+        def fail_insert_table_properties(identifier, properties):
+            raise RuntimeError("injected failure")
+
+        catalog._insert_table_properties = fail_insert_table_properties
+        with self.assertRaises(RuntimeError):
+            catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+
+        with sqlite3.connect(self.jdbc_path) as conn:
+            table_count = conn.execute(
+                "SELECT COUNT(*) FROM paimon_tables "
+                "WHERE catalog_key = ? AND database_name = ? AND table_name = 
?",
+                ("test-jdbc-catalog", "test_db", "test_table")
+            ).fetchone()[0]
+        self.assertEqual(table_count, 0)
+        self.assertFalse(os.path.exists(os.path.join(self.warehouse, 
"test_db.db", "test_table")))
+
+    def test_rename_table_keeps_metadata_when_file_move_fails(self):
+        fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+        catalog = CatalogFactory.create(self.options)
+        catalog.create_database("test_db", False)
+        catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+
+        def fail_rename(source, target):
+            raise OSError("injected failure")
+
+        catalog.file_io.rename = fail_rename
+        with self.assertRaises(OSError):
+            catalog.rename_table("test_db.test_table", "test_db.renamed_table")
+
+        self.assertEqual(catalog.list_tables("test_db"), ["test_table"])
+        self.assertTrue(os.path.exists(os.path.join(self.warehouse, 
"test_db.db", "test_table")))
+
+    def test_drop_database_requires_cascade_for_non_empty_database(self):
+        fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+        catalog = CatalogFactory.create(self.options)
+        catalog.create_database("test_db", False)
+        catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+
+        with self.assertRaises(ValueError):
+            catalog.drop_database("test_db")
+
+        catalog.drop_database("test_db", cascade=True)
+        self.assertEqual(catalog.list_databases(), [])
+
+
+if __name__ == '__main__':
+    unittest.main()


Reply via email to