This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0eb9011fc8 [python] Support JDBC catalog (#7720)
0eb9011fc8 is described below
commit 0eb9011fc824d39b9827f994a561c52707f62161
Author: Colin <[email protected]>
AuthorDate: Thu Jun 4 13:33:07 2026 +0800
[python] Support JDBC catalog (#7720)
Support JDBC catalog in PyPaimon. This adds a Python JDBC catalog
implementation that uses the same catalog metadata tables as Java Paimon
JDBC catalog: `paimon_tables`, `paimon_database_properties`, and
`paimon_table_properties`.
The implementation supports SQLite with the Python standard library and
dynamically supports MySQL/PostgreSQL when a corresponding Python DB-API
driver is installed. Table data and schema files continue to use
existing PyPaimon `FileIO` and `SchemaManager` behavior.
---
docs/docs/pypaimon/python-api.mdx | 42 +-
paimon-python/pypaimon/catalog/catalog_factory.py | 8 +-
paimon-python/pypaimon/catalog/jdbc_catalog.py | 622 +++++++++++++++++++++
.../pypaimon/catalog/jdbc_catalog_loader.py | 32 ++
paimon-python/pypaimon/common/options/config.py | 7 +
paimon-python/pypaimon/tests/jdbc_catalog_test.py | 223 ++++++++
6 files changed, 930 insertions(+), 4 deletions(-)
diff --git a/docs/docs/pypaimon/python-api.mdx
b/docs/docs/pypaimon/python-api.mdx
index 52afbc2676..c1887c537c 100644
--- a/docs/docs/pypaimon/python-api.mdx
+++ b/docs/docs/pypaimon/python-api.mdx
@@ -103,6 +103,46 @@ catalog_options = {
</TabItem>
+<TabItem value="jdbc-catalog" label="jdbc catalog">
+
+PyPaimon keeps the catalog type `jdbc` for compatibility with Paimon catalog
options,
+but connects with native Python DB-API drivers instead of JVM JDBC drivers.
+
+```python
+from pypaimon import CatalogFactory
+
+# Note that keys and values are all string
+catalog_options = {
+ 'metastore': 'jdbc',
+ 'warehouse': 'file:///path/to/warehouse',
+ 'uri': 'jdbc:sqlite:/path/to/catalog.db',
+ # Optional. Defaults to 'jdbc'.
+ 'catalog-key': 'jdbc',
+}
+catalog = CatalogFactory.create(catalog_options)
+```
+
+For MySQL or PostgreSQL, install the corresponding Python DB-API driver and
use the same Paimon
+JDBC catalog options:
+
+```python
+catalog_options = {
+ 'metastore': 'jdbc',
+ 'warehouse': 's3://bucket/path/to/warehouse',
+ 'uri': 'jdbc:mysql://<host>:<port>/<databaseName>',
+ 'jdbc.user': '...',
+ 'jdbc.password': '...',
+ 'catalog-key': 'jdbc',
+}
+```
+
+Unlike Flink or Spark, PyPaimon does not use JVM JDBC drivers or load JDBC
connector jars.
+It keeps the `metastore='jdbc'` and `jdbc:` URI format for compatibility with
Paimon's
+JDBC catalog configuration, but the database connection is created through
native Python DB-API
+drivers such as `pymysql`, `mysql-connector-python`, `psycopg2`, or `psycopg`.
+
+</TabItem>
+
<TabItem value="rest-catalog" label="rest catalog">
The sample code is as follows. The detailed meaning of option can be found in
[REST](../concepts/rest/).
@@ -124,7 +164,7 @@ catalog = CatalogFactory.create(catalog_options)
</Tabs>
-Currently, PyPaimon only support filesystem catalog and rest catalog. See
[Catalog](../concepts/catalog).
+Currently, PyPaimon supports filesystem catalog, jdbc catalog and rest
catalog. See [Catalog](../concepts/catalog).
You can use the catalog to create table for writing data.
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/catalog/catalog_factory.py
index 741d785d29..117c10b2e9 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -21,6 +21,7 @@ from pypaimon.common.options import Options
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.common.options.config import CatalogOptions
@@ -29,6 +30,7 @@ class CatalogFactory:
CATALOG_REGISTRY = {
"filesystem": FileSystemCatalog,
+ "jdbc": JdbcCatalog,
"rest": RESTCatalog,
}
@@ -39,6 +41,6 @@ class CatalogFactory:
if catalog_class is None:
raise ValueError("Unknown catalog identifier: {}. "
"Available types: {}".format(identifier,
list(CatalogFactory.CATALOG_REGISTRY.keys())))
- return catalog_class(
- CatalogContext.create_from_options(Options(catalog_options))) if
identifier == "rest" else catalog_class(
- Options(catalog_options))
+ if identifier in ("jdbc", "rest"):
+ return
catalog_class(CatalogContext.create_from_options(Options(catalog_options)))
+ return catalog_class(Options(catalog_options))
diff --git a/paimon-python/pypaimon/catalog/jdbc_catalog.py
b/paimon-python/pypaimon/catalog/jdbc_catalog.py
new file mode 100644
index 0000000000..ccf5dbfa1a
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/jdbc_catalog.py
@@ -0,0 +1,622 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import sqlite3
+from contextlib import closing, contextmanager
+from typing import Dict, List, Optional, Tuple, Union
+from urllib.parse import parse_qs, urlparse
+
+from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_environment import CatalogEnvironment
+from pypaimon.catalog.catalog_exception import (
+ DatabaseAlreadyExistException,
+ DatabaseNotExistException,
+ TableAlreadyExistException,
+ TableNotExistException
+)
+from pypaimon.catalog.database import Database
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.identifier import Identifier
+from pypaimon.common.options.config import CatalogOptions, JdbcCatalogOptions
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.schema import Schema
+from pypaimon.schema.schema_change import SchemaChange
+from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.table import Table
+
+
+def _convert_qmark_placeholders(sql: str, placeholder: str) -> str:
+ if placeholder == "?":
+ return sql
+
+ result = []
+ in_single_quote = False
+ in_double_quote = False
+ index = 0
+ while index < len(sql):
+ char = sql[index]
+ if char == "'" and not in_double_quote:
+ result.append(char)
+ if in_single_quote and index + 1 < len(sql) and sql[index + 1] ==
"'":
+ index += 1
+ result.append(sql[index])
+ else:
+ in_single_quote = not in_single_quote
+ elif char == '"' and not in_single_quote:
+ result.append(char)
+ if in_double_quote and index + 1 < len(sql) and sql[index + 1] ==
'"':
+ index += 1
+ result.append(sql[index])
+ else:
+ in_double_quote = not in_double_quote
+ elif char == "?" and not in_single_quote and not in_double_quote:
+ result.append(placeholder)
+ else:
+ result.append(char)
+ index += 1
+ return "".join(result)
+
+
+class _DbApiConnection:
+ def __init__(self, options: Dict[str, str]):
+ self.options = options
+ self.uri = options.get(CatalogOptions.URI.key())
+ if not self.uri:
+ raise ValueError(f"Paimon '{CatalogOptions.URI.key()}' must be set
for JDBC catalog")
+ self.protocol, self.placeholder, self.connection =
self._connect(self.uri, options)
+
+ def close(self):
+ self.connection.close()
+
+ def execute(self, sql: str, args: Tuple = ()):
+ with closing(self.connection.cursor()) as cursor:
+ cursor.execute(self._sql(sql), args)
+ return cursor.rowcount
+
+ def executemany(self, sql: str, args):
+ with closing(self.connection.cursor()) as cursor:
+ cursor.executemany(self._sql(sql), args)
+ return cursor.rowcount
+
+ def fetch_all(self, sql: str, args: Tuple = ()):
+ with closing(self.connection.cursor()) as cursor:
+ cursor.execute(self._sql(sql), args)
+ return cursor.fetchall()
+
+ def fetch_one(self, sql: str, args: Tuple = ()):
+ with closing(self.connection.cursor()) as cursor:
+ cursor.execute(self._sql(sql), args)
+ return cursor.fetchone()
+
+ def _sql(self, sql: str) -> str:
+ return _convert_qmark_placeholders(sql, self.placeholder)
+
+ @contextmanager
+ def transaction(self):
+ try:
+ yield
+ self.connection.commit()
+ except Exception:
+ self.connection.rollback()
+ raise
+
+ @staticmethod
+ def _jdbc_properties(options: Dict[str, str]) -> Dict[str, str]:
+ result = {}
+ for key, value in options.items():
+ if key.startswith("jdbc."):
+ result[key[len("jdbc."):]] = value
+ return result
+
+ def _connect(self, uri: str, options: Dict[str, str]):
+ if uri.startswith("jdbc:sqlite:"):
+ return self._connect_sqlite(uri)
+ if uri.startswith("jdbc:mysql:"):
+ return self._connect_mysql(uri, options)
+ if uri.startswith("jdbc:postgresql:"):
+ return self._connect_postgresql(uri, options)
+ raise ValueError(f"Unsupported JDBC catalog URI for Python DB-API
connection: {uri}")
+
+ def _connect_sqlite(self, uri: str):
+ sqlite_uri = uri[len("jdbc:sqlite:"):]
+ if sqlite_uri.startswith("file:"):
+ connection = sqlite3.connect(sqlite_uri, uri=True,
check_same_thread=False)
+ else:
+ connection = sqlite3.connect(sqlite_uri, check_same_thread=False)
+ return "sqlite", "?", connection
+
+ def _connect_mysql(self, uri: str, options: Dict[str, str]):
+ try:
+ import pymysql
+ connector = "pymysql"
+ except ImportError:
+ try:
+ import mysql.connector as mysql_connector
+ connector = "mysql-connector"
+ except ImportError as e:
+ raise ImportError(
+ "PyPaimon JDBC catalog uses Python DB-API drivers and
requires "
+ "pymysql or mysql-connector-python to connect to MySQL."
+ ) from e
+
+ parsed = urlparse(uri[len("jdbc:"):])
+ props = self._jdbc_properties(options)
+ query = {k: v[0] for k, v in parse_qs(parsed.query).items()}
+ props.update(query)
+ user = props.pop("user", props.pop("username", None))
+ password = props.pop("password", None)
+ host = props.pop("host", parsed.hostname)
+ database = parsed.path.lstrip("/") or props.pop("database", "")
+ props.pop("database", None)
+ port_value = props.pop("port", None)
+ port = parsed.port or int(port_value or 3306)
+ if connector == "pymysql":
+ connection = pymysql.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ database=database,
+ autocommit=False,
+ **props
+ )
+ else:
+ connection = mysql_connector.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ database=database,
+ **props
+ )
+ return "mysql", "%s", connection
+
+ def _connect_postgresql(self, uri: str, options: Dict[str, str]):
+ try:
+ import psycopg2
+ connector = "psycopg2"
+ except ImportError:
+ try:
+ import psycopg
+ connector = "psycopg"
+ except ImportError as e:
+ raise ImportError(
+ "PyPaimon JDBC catalog uses Python DB-API drivers and
requires "
+ "psycopg2 or psycopg to connect to PostgreSQL."
+ ) from e
+
+ parsed = urlparse(uri[len("jdbc:"):])
+ props = self._jdbc_properties(options)
+ query = {k: v[0] for k, v in parse_qs(parsed.query).items()}
+ props.update(query)
+ user = props.pop("user", props.pop("username", None))
+ password = props.pop("password", None)
+ host = props.pop("host", parsed.hostname)
+ database = parsed.path.lstrip("/") or props.get("database") or
props.get("dbname") or ""
+ props.pop("database", None)
+ props.pop("dbname", None)
+ port_value = props.pop("port", None)
+ port = parsed.port or int(port_value or 5432)
+ connect_kwargs = {
+ "host": host,
+ "port": port,
+ "user": user,
+ "password": password,
+ "dbname": database,
+ }
+ connect_kwargs.update(props)
+ if connector == "psycopg2":
+ connection = psycopg2.connect(**connect_kwargs)
+ else:
+ connection = psycopg.connect(**connect_kwargs)
+ return "postgresql", "%s", connection
+
+
+class JdbcCatalog(Catalog):
+ CATALOG_TABLE_NAME = "paimon_tables"
+ DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"
+ TABLE_PROPERTIES_TABLE_NAME = "paimon_table_properties"
+ CATALOG_KEY = "catalog_key"
+ TABLE_DATABASE = "database_name"
+ TABLE_NAME = "table_name"
+ PROPERTY_KEY = "property_key"
+ PROPERTY_VALUE = "property_value"
+ DATABASE_EXISTS_PROPERTY = "exists"
+
+ def __init__(self, context: CatalogContext):
+ catalog_options = context.options
+ if not catalog_options.contains(CatalogOptions.WAREHOUSE):
+ raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path
must be set")
+ self.context = context
+ self.catalog_options = catalog_options
+ self.options = catalog_options.to_map()
+ self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
+ self.catalog_key = catalog_options.get(JdbcCatalogOptions.CATALOG_KEY)
+ self.file_io = FileIO.get(self.warehouse, self.catalog_options)
+ self.connection = _DbApiConnection(self.options)
+ self._initialize_catalog_tables()
+
+ def close(self):
+ self.connection.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+ return False
+
+ def _initialize_catalog_tables(self):
+ with self.connection.transaction():
+ self.connection.execute(
+ "CREATE TABLE IF NOT EXISTS paimon_tables ("
+ "catalog_key VARCHAR(255) NOT NULL, "
+ "database_name VARCHAR(255) NOT NULL, "
+ "table_name VARCHAR(255) NOT NULL, "
+ "PRIMARY KEY (catalog_key, database_name, table_name))"
+ )
+ self.connection.execute(
+ "CREATE TABLE IF NOT EXISTS paimon_database_properties ("
+ "catalog_key VARCHAR(255) NOT NULL, "
+ "database_name VARCHAR(255) NOT NULL, "
+ "property_key VARCHAR(255), "
+ "property_value VARCHAR(1000), "
+ "PRIMARY KEY (catalog_key, database_name, property_key))"
+ )
+ self.connection.execute(
+ "CREATE TABLE IF NOT EXISTS paimon_table_properties ("
+ "catalog_key VARCHAR(255) NOT NULL, "
+ "database_name VARCHAR(255) NOT NULL, "
+ "table_name VARCHAR(255) NOT NULL, "
+ "property_key VARCHAR(255) NOT NULL, "
+ "property_value VARCHAR(1000), "
+ "PRIMARY KEY (catalog_key, database_name, table_name,
property_key))"
+ )
+
+ def list_databases(self) -> List[str]:
+ table_rows = self.connection.fetch_all(
+ "SELECT DISTINCT database_name FROM paimon_tables WHERE
catalog_key = ?",
+ (self.catalog_key,)
+ )
+ property_rows = self.connection.fetch_all(
+ "SELECT DISTINCT database_name FROM paimon_database_properties
WHERE catalog_key = ?",
+ (self.catalog_key,)
+ )
+ databases = {row[0] for row in table_rows}
+ databases.update(row[0] for row in property_rows)
+ return sorted(databases)
+
+ def get_database(self, name: str) -> Database:
+ if not self._database_exists(name):
+ raise DatabaseNotExistException(name)
+ properties = self._fetch_database_properties(name)
+ if Catalog.DB_LOCATION_PROP not in properties:
+ properties[Catalog.DB_LOCATION_PROP] = self.get_database_path(name)
+ properties.pop(self.DATABASE_EXISTS_PROPERTY, None)
+ return Database(name, properties)
+
+ def create_database(self, name: str, ignore_if_exists: bool, properties:
Optional[dict] = None):
+ if self._database_exists(name):
+ if not ignore_if_exists:
+ raise DatabaseAlreadyExistException(name)
+ return
+ create_props = {self.DATABASE_EXISTS_PROPERTY: "true"}
+ if properties:
+ create_props.update(properties)
+ if Catalog.DB_LOCATION_PROP not in create_props:
+ create_props[Catalog.DB_LOCATION_PROP] =
self.get_database_path(name)
+ with self.connection.transaction():
+ self._insert_database_properties(name, create_props)
+
+ def drop_database(self, name: str, ignore_if_not_exists: bool = False,
cascade: bool = False):
+ if not self._database_exists(name):
+ if not ignore_if_not_exists:
+ raise DatabaseNotExistException(name)
+ return
+ tables = self.list_tables(name)
+ if tables and not cascade:
+ raise ValueError(f"Database {name} is not empty. Use cascade=True
to drop all tables first.")
+ if cascade:
+ for table in tables:
+ self.drop_table(Identifier.create(name, table), True)
+ with self.connection.transaction():
+ self.connection.execute(
+ "DELETE FROM paimon_tables WHERE catalog_key = ? AND
database_name = ?",
+ (self.catalog_key, name)
+ )
+ self.connection.execute(
+ "DELETE FROM paimon_database_properties WHERE catalog_key = ?
AND database_name = ?",
+ (self.catalog_key, name)
+ )
+ self.connection.execute(
+ "DELETE FROM paimon_table_properties WHERE catalog_key = ? AND
database_name = ?",
+ (self.catalog_key, name)
+ )
+
+ def alter_database(self, name: str, changes: list):
+ self.get_database(name)
+ from pypaimon.catalog.rest.property_change import PropertyChange
+ set_properties, remove_keys =
PropertyChange.get_set_properties_to_remove_keys(changes)
+ current = self._fetch_database_properties(name)
+ with self.connection.transaction():
+ update_args = [
+ (value, self.catalog_key, name, key)
+ for key, value in set_properties.items()
+ if key in current
+ ]
+ insert_properties = {
+ key: value for key, value in set_properties.items() if key not
in current
+ }
+ if update_args:
+ self.connection.executemany(
+ "UPDATE paimon_database_properties SET property_value = ? "
+ "WHERE catalog_key = ? AND database_name = ? AND
property_key = ?",
+ update_args
+ )
+ if insert_properties:
+ self._insert_database_properties(name, insert_properties)
+ for key in remove_keys:
+ self.connection.execute(
+ "DELETE FROM paimon_database_properties "
+ "WHERE catalog_key = ? AND database_name = ? AND
property_key = ?",
+ (self.catalog_key, name, key)
+ )
+
+ def list_tables(self, database_name: str) -> List[str]:
+ self.get_database(database_name)
+ rows = self.connection.fetch_all(
+ "SELECT table_name FROM paimon_tables WHERE catalog_key = ? AND
database_name = ?",
+ (self.catalog_key, database_name)
+ )
+ return sorted(row[0] for row in rows)
+
+ def get_table(self, identifier: Union[str, Identifier]) -> Table:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ if self.catalog_options.contains(CoreOptions.SCAN_FALLBACK_BRANCH):
+ raise ValueError(f"Unsupported CoreOption
{CoreOptions.SCAN_FALLBACK_BRANCH}")
+ if not self._table_exists(identifier):
+ raise TableNotExistException(identifier)
+ table_path = self.get_table_path(identifier)
+ table_schema = self.get_table_schema(identifier)
+ from pypaimon.catalog.jdbc_catalog_loader import JdbcCatalogLoader
+ catalog_environment = CatalogEnvironment(
+ identifier=identifier,
+ uuid=None,
+ catalog_loader=JdbcCatalogLoader(self.context),
+ supports_version_management=False
+ )
+ return FileStoreTable(self.file_io, identifier, table_path,
table_schema, catalog_environment)
+
+ def create_table(self, identifier: Union[str, Identifier], schema:
'Schema', ignore_if_exists: bool):
+ if schema.options and
schema.options.get(CoreOptions.AUTO_CREATE.key()):
+ raise ValueError(f"The value of {CoreOptions.AUTO_CREATE.key()}
property should be False.")
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ self.get_database(identifier.get_database_name())
+ if self._table_exists(identifier):
+ if not ignore_if_exists:
+ raise TableAlreadyExistException(identifier)
+ return
+ if schema.options and CoreOptions.TYPE.key() in schema.options and
schema.options.get(
+ CoreOptions.TYPE.key()) != "table":
+ raise ValueError(f"Table Type:
{schema.options.get(CoreOptions.TYPE.key())}")
+
+ table_path = self.get_table_path(identifier)
+ schema_manager = SchemaManager(self.file_io, table_path)
+ table_schema = schema_manager.create_table(schema)
+ try:
+ with self.connection.transaction():
+ self.connection.execute(
+ "INSERT INTO paimon_tables (catalog_key, database_name,
table_name) VALUES (?, ?, ?)",
+ (self.catalog_key, identifier.get_database_name(),
identifier.get_table_name())
+ )
+ if self._sync_all_properties():
+ self._insert_table_properties(identifier,
self._collect_table_properties(table_schema))
+ except Exception:
+ self.file_io.delete_directory_quietly(table_path)
+ raise
+
+ def drop_table(self, identifier: Union[str, Identifier],
ignore_if_not_exists: bool = False):
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ if not self._table_exists(identifier):
+ if not ignore_if_not_exists:
+ raise TableNotExistException(identifier)
+ return
+ table_path = self.get_table_path(identifier)
+ with self.connection.transaction():
+ self.connection.execute(
+ "DELETE FROM paimon_tables WHERE catalog_key = ? AND
database_name = ? AND table_name = ?",
+ (self.catalog_key, identifier.get_database_name(),
identifier.get_table_name())
+ )
+ self.connection.execute(
+ "DELETE FROM paimon_table_properties WHERE catalog_key = ? AND
database_name = ? AND table_name = ?",
+ (self.catalog_key, identifier.get_database_name(),
identifier.get_table_name())
+ )
+ self.file_io.delete_directory_quietly(table_path)
+
+ def rename_table(self, source_identifier: Union[str, Identifier],
target_identifier: Union[str, Identifier]):
+ if not isinstance(source_identifier, Identifier):
+ source_identifier = Identifier.from_string(source_identifier)
+ if not isinstance(target_identifier, Identifier):
+ target_identifier = Identifier.from_string(target_identifier)
+ if not self._table_exists(source_identifier):
+ raise TableNotExistException(source_identifier)
+ self.get_database(target_identifier.get_database_name())
+ if self._table_exists(target_identifier):
+ raise TableAlreadyExistException(target_identifier)
+
+ source_path = self.get_table_path(source_identifier)
+ target_path = self.get_table_path(target_identifier)
+ renamed_path = False
+ if self.file_io.exists(source_path):
+ self.file_io.rename(source_path, target_path)
+ renamed_path = True
+ try:
+ with self.connection.transaction():
+ self.connection.execute(
+ "UPDATE paimon_tables SET database_name = ?, table_name =
? "
+ "WHERE catalog_key = ? AND database_name = ? AND
table_name = ?",
+ (
+ target_identifier.get_database_name(),
+ target_identifier.get_table_name(),
+ self.catalog_key,
+ source_identifier.get_database_name(),
+ source_identifier.get_table_name()
+ )
+ )
+ self.connection.execute(
+ "UPDATE paimon_table_properties SET database_name = ?,
table_name = ? "
+ "WHERE catalog_key = ? AND database_name = ? AND
table_name = ?",
+ (
+ target_identifier.get_database_name(),
+ target_identifier.get_table_name(),
+ self.catalog_key,
+ source_identifier.get_database_name(),
+ source_identifier.get_table_name()
+ )
+ )
+ except Exception:
+ if renamed_path and self.file_io.exists(target_path):
+ self.file_io.rename(target_path, source_path)
+ raise
+
+ def alter_table(
+ self,
+ identifier: Union[str, Identifier],
+ changes: List[SchemaChange],
+ ignore_if_not_exists: bool = False
+ ):
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ if not self._table_exists(identifier):
+ if not ignore_if_not_exists:
+ raise TableNotExistException(identifier)
+ return
+ schema_manager = SchemaManager(self.file_io,
self.get_table_path(identifier))
+ table_schema = schema_manager.commit_changes(changes)
+ if self._sync_all_properties():
+ with self.connection.transaction():
+ self.connection.execute(
+ "DELETE FROM paimon_table_properties "
+ "WHERE catalog_key = ? AND database_name = ? AND
table_name = ?",
+ (self.catalog_key, identifier.get_database_name(),
identifier.get_table_name())
+ )
+ self._insert_table_properties(identifier,
self._collect_table_properties(table_schema))
+
+ def get_table_schema(self, identifier: Identifier):
+ table_schema = SchemaManager(self.file_io,
self.get_table_path(identifier)).latest()
+ if table_schema is None:
+ raise TableNotExistException(identifier)
+ return table_schema
+
+ def get_database_path(self, name: str) -> str:
+ warehouse = self.warehouse.rstrip('/')
+ return f"{warehouse}/{name}{Catalog.DB_SUFFIX}"
+
+ def get_table_path(self, identifier: Identifier) -> str:
+ db_path = self.get_database_path(identifier.get_database_name())
+ return f"{db_path}/{identifier.get_table_name()}"
+
+ def load_snapshot(self, identifier: Identifier):
+ raise NotImplementedError("JDBC catalog does not support
load_snapshot")
+
+ def commit_snapshot(
+ self,
+ identifier: Identifier,
+ table_uuid: Optional[str],
+ snapshot: Snapshot,
+ statistics: List[PartitionStatistics]
+ ) -> bool:
+ raise NotImplementedError("This catalog does not support commit
catalog")
+
+ def _database_exists(self, database_name: str) -> bool:
+ row = self.connection.fetch_one(
+ "SELECT database_name FROM paimon_tables "
+ "WHERE catalog_key = ? AND database_name = ? LIMIT 1",
+ (self.catalog_key, database_name)
+ )
+ if row is not None:
+ return True
+ row = self.connection.fetch_one(
+ "SELECT database_name FROM paimon_database_properties "
+ "WHERE catalog_key = ? AND database_name = ? LIMIT 1",
+ (self.catalog_key, database_name)
+ )
+ return row is not None
+
+ def _table_exists(self, identifier: Identifier) -> bool:
+ row = self.connection.fetch_one(
+ "SELECT table_name FROM paimon_tables "
+ "WHERE catalog_key = ? AND database_name = ? AND table_name = ?
LIMIT 1",
+ (self.catalog_key, identifier.get_database_name(),
identifier.get_table_name())
+ )
+ return row is not None
+
+ def _fetch_database_properties(self, database_name: str) -> Dict[str, str]:
+ rows = self.connection.fetch_all(
+ "SELECT property_key, property_value FROM
paimon_database_properties "
+ "WHERE catalog_key = ? AND database_name = ?",
+ (self.catalog_key, database_name)
+ )
+ return {row[0]: row[1] for row in rows}
+
+ def _insert_database_properties(self, database_name: str, properties:
Dict[str, str]):
+ if properties:
+ self.connection.executemany(
+ "INSERT INTO paimon_database_properties "
+ "(catalog_key, database_name, property_key, property_value)
VALUES (?, ?, ?, ?)",
+ [(self.catalog_key, database_name, key, value) for key, value
in properties.items()]
+ )
+
+ def _insert_table_properties(self, identifier: Identifier, properties:
Dict[str, str]):
+ if properties:
+ self.connection.executemany(
+ "INSERT INTO paimon_table_properties "
+ "(catalog_key, database_name, table_name, property_key,
property_value) "
+ "VALUES (?, ?, ?, ?, ?)",
+ [
+ (
+ self.catalog_key,
+ identifier.get_database_name(),
+ identifier.get_table_name(),
+ key,
+ value
+ )
+ for key, value in properties.items()
+ ]
+ )
+
+ def _sync_all_properties(self) -> bool:
+ from pypaimon.common.options.options_utils import OptionsUtils
+ return OptionsUtils.convert_to_boolean(
+ self.catalog_options.get(CatalogOptions.SYNC_ALL_PROPERTIES))
+
+ @staticmethod
+ def _collect_table_properties(table_schema) -> Dict[str, str]:
+ properties = dict(table_schema.options or {})
+ if table_schema.primary_keys:
+ properties["primary-key"] = ",".join(table_schema.primary_keys)
+ if table_schema.partition_keys:
+ properties["partition"] = ",".join(table_schema.partition_keys)
+ return properties
diff --git a/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py
b/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py
new file mode 100644
index 0000000000..801ecb1a1c
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/jdbc_catalog_loader.py
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_loader import CatalogLoader
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog
+
+
+class JdbcCatalogLoader(CatalogLoader):
+ def __init__(self, context: CatalogContext):
+ self._context = context
+
+ def context(self) -> CatalogContext:
+ return self._context
+
+ def load(self) -> JdbcCatalog:
+ return JdbcCatalog(self._context)
diff --git a/paimon-python/pypaimon/common/options/config.py
b/paimon-python/pypaimon/common/options/config.py
index 6189ebaf89..0004cb896a 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -59,6 +59,11 @@ class GcsOptions:
.with_description("GCP project ID for GCS requests."))
+class JdbcCatalogOptions:
+ CATALOG_KEY =
ConfigOptions.key("catalog-key").string_type().default_value("jdbc").with_description(
+ "Custom JDBC catalog store key.")
+
+
class PVFSOptions:
CACHE_ENABLED =
ConfigOptions.key("cache-enabled").boolean_type().default_value("true").with_description(
"Enable cache")
@@ -101,6 +106,8 @@ class CatalogOptions:
PREFIX =
ConfigOptions.key("prefix").string_type().no_default_value().with_description("Prefix")
HTTP_USER_AGENT_HEADER = ConfigOptions.key(
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP
User Agent header")
+ SYNC_ALL_PROPERTIES =
ConfigOptions.key("sync-all-properties").boolean_type().default_value(True).with_description(
+ "Sync all table properties to the catalog metastore")
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1
diff --git a/paimon-python/pypaimon/tests/jdbc_catalog_test.py
b/paimon-python/pypaimon/tests/jdbc_catalog_test.py
new file mode 100644
index 0000000000..04ca55e250
--- /dev/null
+++ b/paimon-python/pypaimon/tests/jdbc_catalog_test.py
@@ -0,0 +1,223 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import os
+import shutil
+import sqlite3
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.catalog.catalog_exception import (
+ DatabaseAlreadyExistException,
+ DatabaseNotExistException,
+ TableAlreadyExistException,
+ TableNotExistException
+)
+from pypaimon.catalog.jdbc_catalog import JdbcCatalog,
_convert_qmark_placeholders
+from pypaimon.catalog.rest.property_change import PropertyChange
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.schema_change import SchemaChange
+
+
+class JdbcCatalogTest(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+ self.warehouse = os.path.join(self.temp_dir, "warehouse")
+ self.jdbc_path = os.path.join(self.temp_dir, "catalog.db")
+ self.options = {
+ "metastore": "jdbc",
+ "warehouse": self.warehouse,
+ "uri": "jdbc:sqlite:" + self.jdbc_path,
+ "catalog-key": "test-jdbc-catalog",
+ }
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+ def test_create_jdbc_catalog(self):
+ catalog = CatalogFactory.create(self.options)
+ self.assertTrue(isinstance(catalog, JdbcCatalog))
+
+ with sqlite3.connect(self.jdbc_path) as conn:
+ tables = {
+ row[0]
+ for row in conn.execute(
+ "SELECT name FROM sqlite_master WHERE type = 'table'"
+ )
+ }
+ self.assertIn("paimon_tables", tables)
+ self.assertIn("paimon_database_properties", tables)
+ self.assertIn("paimon_table_properties", tables)
+
+ def test_jdbc_catalog_context_manager_closes_connection(self):
+ with CatalogFactory.create(self.options) as catalog:
+ self.assertTrue(isinstance(catalog, JdbcCatalog))
+
+ with self.assertRaises(sqlite3.ProgrammingError):
+ catalog.list_databases()
+
+ def test_placeholder_conversion_skips_string_literals(self):
+ sql = "SELECT '?' AS q, \"?\" AS quoted, col FROM tbl WHERE a = ? AND
b = '?'"
+ self.assertEqual(
+ _convert_qmark_placeholders(sql, "%s"),
+ "SELECT '?' AS q, \"?\" AS quoted, col FROM tbl WHERE a = %s AND b
= '?'"
+ )
+
+ def test_database(self):
+ catalog = CatalogFactory.create(self.options)
+ catalog.create_database("test_db", False, {"owner": "owner1"})
+
+ with self.assertRaises(DatabaseAlreadyExistException):
+ catalog.create_database("test_db", False)
+
+ self.assertEqual(catalog.list_databases(), ["test_db"])
+ database = catalog.get_database("test_db")
+ self.assertEqual(database.name, "test_db")
+ self.assertEqual(database.options["owner"], "owner1")
+ self.assertEqual(
+ database.options["location"],
+ os.path.join(self.warehouse, "test_db.db")
+ )
+
+ reloaded = CatalogFactory.create(self.options)
+ self.assertEqual(reloaded.list_databases(), ["test_db"])
+ reloaded.alter_database(
+ "test_db",
+ [
+ PropertyChange.set_property("comment", "new comment"),
+ PropertyChange.remove_property("owner"),
+ ]
+ )
+ updated = reloaded.get_database("test_db")
+ self.assertEqual(updated.options["comment"], "new comment")
+ self.assertNotIn("owner", updated.options)
+
+ reloaded.drop_database("test_db")
+ with self.assertRaises(DatabaseNotExistException):
+ reloaded.get_database("test_db")
+
+ def test_table(self):
+ fields = [
+ DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+ DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+ ]
+ catalog = CatalogFactory.create(self.options)
+ catalog.create_database("test_db", False)
+ catalog.create_table(
+ "test_db.test_table",
+ Schema(fields=fields, partition_keys=["f1"], options={"bucket":
"1"}),
+ False
+ )
+
+ with self.assertRaises(TableAlreadyExistException):
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+
+ self.assertEqual(catalog.list_tables("test_db"), ["test_table"])
+ self.assertTrue(
+ os.path.exists(
+ os.path.join(self.warehouse, "test_db.db", "test_table",
"schema", "schema-0")
+ )
+ )
+
+ reloaded = CatalogFactory.create(self.options)
+ table = reloaded.get_table("test_db.test_table")
+ self.assertEqual(table.fields[0].name, "f0")
+ self.assertTrue(isinstance(table.fields[0].type, AtomicType))
+ self.assertEqual(table.fields[0].type.type, "INT")
+
+ with sqlite3.connect(self.jdbc_path) as conn:
+ properties = dict(
+ conn.execute(
+ "SELECT property_key, property_value FROM
paimon_table_properties "
+ "WHERE catalog_key = ? AND database_name = ? AND
table_name = ?",
+ ("test-jdbc-catalog", "test_db", "test_table")
+ ).fetchall()
+ )
+ self.assertEqual(properties["bucket"], "1")
+ self.assertEqual(properties["partition"], "f1")
+
+ reloaded.alter_table(
+ "test_db.test_table",
+ [SchemaChange.add_column("f2", AtomicType("BIGINT"))]
+ )
+ self.assertEqual(len(reloaded.get_table("test_db.test_table").fields),
3)
+
+ reloaded.rename_table("test_db.test_table", "test_db.renamed_table")
+ self.assertEqual(reloaded.list_tables("test_db"), ["renamed_table"])
+ with self.assertRaises(TableNotExistException):
+ reloaded.get_table("test_db.test_table")
+
+ reloaded.drop_table("test_db.renamed_table")
+ self.assertEqual(reloaded.list_tables("test_db"), [])
+ with self.assertRaises(TableNotExistException):
+ reloaded.get_table("test_db.renamed_table")
+
+ def test_create_table_rolls_back_metadata_on_failure(self):
+ fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+ catalog = CatalogFactory.create(self.options)
+ catalog.create_database("test_db", False)
+
+ def fail_insert_table_properties(identifier, properties):
+ raise RuntimeError("injected failure")
+
+ catalog._insert_table_properties = fail_insert_table_properties
+ with self.assertRaises(RuntimeError):
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+
+ with sqlite3.connect(self.jdbc_path) as conn:
+ table_count = conn.execute(
+ "SELECT COUNT(*) FROM paimon_tables "
+ "WHERE catalog_key = ? AND database_name = ? AND table_name =
?",
+ ("test-jdbc-catalog", "test_db", "test_table")
+ ).fetchone()[0]
+ self.assertEqual(table_count, 0)
+ self.assertFalse(os.path.exists(os.path.join(self.warehouse,
"test_db.db", "test_table")))
+
+ def test_rename_table_keeps_metadata_when_file_move_fails(self):
+ fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+ catalog = CatalogFactory.create(self.options)
+ catalog.create_database("test_db", False)
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+
+ def fail_rename(source, target):
+ raise OSError("injected failure")
+
+ catalog.file_io.rename = fail_rename
+ with self.assertRaises(OSError):
+ catalog.rename_table("test_db.test_table", "test_db.renamed_table")
+
+ self.assertEqual(catalog.list_tables("test_db"), ["test_table"])
+ self.assertTrue(os.path.exists(os.path.join(self.warehouse,
"test_db.db", "test_table")))
+
+ def test_drop_database_requires_cascade_for_non_empty_database(self):
+ fields = [DataField.from_dict({"id": 1, "name": "f0", "type": "INT"})]
+ catalog = CatalogFactory.create(self.options)
+ catalog.create_database("test_db", False)
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+
+ with self.assertRaises(ValueError):
+ catalog.drop_database("test_db")
+
+ catalog.drop_database("test_db", cascade=True)
+ self.assertEqual(catalog.list_databases(), [])
+
+
+if __name__ == '__main__':
+ unittest.main()