This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 0a5004c78dc0a0e561896cb78139f13d8b38f7dc
Author: umi <[email protected]>
AuthorDate: Wed Sep 24 09:48:33 2025 +0800

    [python] Support ignore_if_exists param for database and table (#6314)
---
 .../pypaimon/catalog/rest/rest_catalog.py          | 35 +++++++++---
 paimon-python/pypaimon/tests/predicates_test.py    | 66 +++++++++++-----------
 paimon-python/pypaimon/tests/pvfs_test.py          | 14 -----
 .../pypaimon/tests/py36/ao_simple_test.py          | 55 ++++++++++++++++++
 paimon-python/pypaimon/tests/rest/rest_server.py   | 21 ++++++-
 .../pypaimon/tests/rest/rest_simple_test.py        | 55 ++++++++++++++++++
 paimon-python/pypaimon/tests/serializable_test.py  |  2 +-
 7 files changed, 190 insertions(+), 58 deletions(-)

diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index f203169fbd..5e36559a7b 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -22,11 +22,12 @@ from urllib.parse import urlparse
 from pypaimon.api.api_response import GetTableResponse, PagedList
 from pypaimon.api.options import Options
 from pypaimon.api.rest_api import RESTApi
-from pypaimon.api.rest_exception import NoSuchResourceException
+from pypaimon.api.rest_exception import NoSuchResourceException, 
AlreadyExistsException
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
-from pypaimon.catalog.catalog_exception import TableNotExistException
+from pypaimon.catalog.catalog_exception import TableNotExistException, 
DatabaseAlreadyExistException, \
+    TableAlreadyExistException, DatabaseNotExistException
 from pypaimon.catalog.database import Database
 from pypaimon.catalog.rest.property_change import PropertyChange
 from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
@@ -107,7 +108,12 @@ class RESTCatalog(Catalog):
         return self.rest_api.list_databases_paged(max_results, page_token, 
database_name_pattern)
 
     def create_database(self, name: str, ignore_if_exists: bool, properties: 
Dict[str, str] = None):
-        self.rest_api.create_database(name, properties)
+        try:
+            self.rest_api.create_database(name, properties)
+        except AlreadyExistsException as e:
+            if not ignore_if_exists:
+                # Convert REST API exception to catalog exception
+                raise DatabaseAlreadyExistException(name) from e
 
     def get_database(self, name: str) -> Database:
         response = self.rest_api.get_database(name)
@@ -117,8 +123,13 @@ class RESTCatalog(Catalog):
         if response is not None:
             return Database(name, options)
 
-    def drop_database(self, name: str):
-        self.rest_api.drop_database(name)
+    def drop_database(self, name: str, ignore_if_exists: bool = False):
+        try:
+            self.rest_api.drop_database(name)
+        except NoSuchResourceException as e:
+            if not ignore_if_exists:
+                # Convert REST API exception to catalog exception
+                raise DatabaseNotExistException(name) from e
 
     def alter_database(self, name: str, changes: List[PropertyChange]):
         set_properties, remove_keys = 
PropertyChange.get_set_properties_to_remove_keys(changes)
@@ -154,12 +165,20 @@ class RESTCatalog(Catalog):
     def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
-        self.rest_api.create_table(identifier, schema)
+        try:
+            self.rest_api.create_table(identifier, schema)
+        except AlreadyExistsException as e:
+            if not ignore_if_exists:
+                raise TableAlreadyExistException(identifier) from e
 
-    def drop_table(self, identifier: Union[str, Identifier]):
+    def drop_table(self, identifier: Union[str, Identifier], ignore_if_exists: 
bool = False):
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
-        self.rest_api.drop_table(identifier)
+        try:
+            self.rest_api.drop_table(identifier)
+        except NoSuchResourceException as e:
+            if not ignore_if_exists:
+                raise TableNotExistException(identifier) from e
 
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         response = self.rest_api.get_table(identifier)
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 0ddebc3451..7976094290 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -81,14 +81,14 @@ class PredicateTest(unittest.TestCase):
 
         cls.df = df
 
-    def testWrongFieldName(self):
+    def test_wrong_field_name(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         with self.assertRaises(ValueError) as e:
             predicate_builder.equal('f2', 'a')
         self.assertEqual(str(e.exception), "The field f2 is not in field list 
['f0', 'f1'].")
 
-    def testAppendWithDuplicate(self):
+    def test_append_with_duplicate(self):
         pa_schema = pa.schema([
             ('f0', pa.int64()),
             ('f1', pa.string()),
@@ -121,7 +121,7 @@ class PredicateTest(unittest.TestCase):
         actual_df = read.to_pandas(scan.plan().splits())
         self.assertEqual(len(actual_df), 0)
 
-    def testAllFieldTypesWithEqual(self):
+    def test_all_field_types_with_equal(self):
         pa_schema = pa.schema([
             # int
             ('_tinyint', pa.int8()),
@@ -194,169 +194,169 @@ class PredicateTest(unittest.TestCase):
         predicate = predicate_builder.equal('_boolean', True)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
 
-    def testEqualPk(self):
+    def test_equal_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.equal('f0', 1)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[0]])
 
-    def testNotEqualAppend(self):
+    def test_not_equal_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.not_equal('f0', 1)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:4])
 
-    def testNotEqualPk(self):
+    def test_not_equal_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.not_equal('f0', 1)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:4])
 
-    def testLessThanAppend(self):
+    def test_less_than_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.less_than('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
 
-    def testLessThanPk(self):
+    def test_less_than_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.less_than('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
 
-    def testLessOrEqualAppend(self):
+    def test_less_or_equal_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.less_or_equal('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testLessOrEqualPk(self):
+    def test_less_or_equal_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.less_or_equal('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testGreaterThanAppend(self):
+    def test_greater_than_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.greater_than('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[3:4])
 
-    def testGreaterThanPk(self):
+    def test_greater_than_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.greater_than('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[3:4])
 
-    def testGreaterOrEqualAppend(self):
+    def test_greater_or_equal_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.greater_or_equal('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
 
-    def testGreaterOrEqualPk(self):
+    def test_greater_or_equal_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.greater_or_equal('f0', 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
 
-    def testIsNullAppend(self):
+    def test_is_null_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_null('f1')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[4]])
 
-    def testIsNullPk(self):
+    def test_is_null_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_null('f1')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[4]])
 
-    def testIsNotNullAppend(self):
+    def test_is_not_null_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_not_null('f1')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:3])
 
-    def testIsNotNullPk(self):
+    def test_is_not_null_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_not_null('f1')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:3])
 
-    def testStartswithAppend(self):
+    def test_startswith_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.startswith('f1', 'ab')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
 
-    def testStartswithPk(self):
+    def test_startswith_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.startswith('f1', 'ab')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
 
-    def testEndswithAppend(self):
+    def test_endswith_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.endswith('f1', 'bc')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testEndswithPk(self):
+    def test_endswith_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.endswith('f1', 'bc')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testContainsAppend(self):
+    def test_contains_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.contains('f1', 'bb')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
 
-    def testContainsPk(self):
+    def test_contains_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.contains('f1', 'bb')
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
 
-    def testIsInAppend(self):
+    def test_is_in_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_in('f0', [1, 2])
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
 
-    def testIsInPk(self):
+    def test_is_in_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_in('f1', ['abc', 'd'])
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[0, 3]])
 
-    def testIsNotInAppend(self):
+    def test_is_not_in_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_not_in('f0', [1, 2])
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
 
-    def testIsNotInPk(self):
+    def test_is_not_in_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.is_not_in('f1', ['abc', 'abbc'])
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
 
-    def testBetweenAppend(self):
+    def test_between_append(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.between('f0', 1, 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testBetweenPk(self):
+    def test_between_pk(self):
         table = self.catalog.get_table('default.test_pk')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate = predicate_builder.between('f0', 1, 3)
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
 
-    def testAndPredicates(self):
+    def test_and_predicates(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate1 = predicate_builder.greater_than('f0', 1)
@@ -364,7 +364,7 @@ class PredicateTest(unittest.TestCase):
         predicate = predicate_builder.and_predicates([predicate1, predicate2])
         
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
 
-    def testOrPredicates(self):
+    def test_or_predicates(self):
         table = self.catalog.get_table('default.test_append')
         predicate_builder = table.new_read_builder().new_predicate_builder()
         predicate1 = predicate_builder.greater_than('f0', 3)
@@ -373,7 +373,7 @@ class PredicateTest(unittest.TestCase):
         _check_filtered_result(table.new_read_builder().with_filter(predicate),
                                self.df.loc[[0, 3, 4]])
 
-    def testPkReaderWithFilter(self):
+    def test_pk_reader_with_filter(self):
         pa_schema = pa.schema([
             pa.field('key1', pa.int32(), nullable=False),
             pa.field('key2', pa.string(), nullable=False),
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py 
b/paimon-python/pypaimon/tests/pvfs_test.py
index 89c96a6228..29ef979f9e 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -26,9 +26,6 @@ import pandas
 from pypaimon import PaimonVirtualFileSystem
 from pypaimon.api.api_response import ConfigResponse
 from pypaimon.api.auth import BearTokenAuthProvider
-from pypaimon.catalog.rest.table_metadata import TableMetadata
-from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.schema.table_schema import TableSchema
 from pypaimon.tests.rest.api_test import RESTCatalogServer
 
 
@@ -67,17 +64,6 @@ class PVFSTest(unittest.TestCase):
         self.test_databases = {
             self.database: self.server.mock_database(self.database, {"k1": 
"v1", "k2": "v2"}),
         }
-        data_fields = [
-            DataField(0, "id", AtomicType('INT'), 'id'),
-            DataField(1, "name", AtomicType('STRING'), 'name')
-        ]
-        schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), 
data_fields, len(data_fields),
-                             [], [], {}, "")
-        self.server.database_store.update(self.test_databases)
-        self.test_tables = {
-            f"{self.database}.{self.table}": 
TableMetadata(uuid=str(uuid.uuid4()), is_external=True, schema=schema),
-        }
-        self.server.table_metadata_store.update(self.test_tables)
 
     def tearDown(self):
         if self.temp_path.exists():
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py 
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index 17ebf58be7..584ba87587 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -18,6 +18,8 @@ limitations under the License.
 import pyarrow as pa
 
 from pypaimon import Schema
+from pypaimon.catalog.catalog_exception import TableNotExistException, 
TableAlreadyExistException, \
+    DatabaseNotExistException, DatabaseAlreadyExistException
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 
@@ -330,3 +332,56 @@ class AOSimpleTest(RESTBaseTest):
         splits = read_builder.new_scan().with_shard(3, 4).plan().splits()
         actual = table_read.to_arrow(splits)
         self.assertEqual(len(actual), 2)
+
+    def test_create_drop_database_table(self):
+        # test create database
+        self.rest_catalog.create_database("db1", False)
+
+        with self.assertRaises(DatabaseAlreadyExistException) as context:
+            self.rest_catalog.create_database("db1", False)
+
+        self.assertEqual("db1", context.exception.database)
+
+        try:
+            self.rest_catalog.create_database("db1", True)
+        except DatabaseAlreadyExistException:
+            self.fail("create_database with ignore_if_exists=True should not 
raise DatabaseAlreadyExistException")
+
+        # test create table
+        self.rest_catalog.create_table("db1.tbl1",
+                                       
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                       False)
+        with self.assertRaises(TableAlreadyExistException) as context:
+            self.rest_catalog.create_table("db1.tbl1",
+                                           
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                           False)
+        self.assertEqual("db1.tbl1", 
context.exception.identifier.get_full_name())
+
+        try:
+            self.rest_catalog.create_table("db1.tbl1",
+                                           
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                           True)
+        except TableAlreadyExistException:
+            self.fail("create_table with ignore_if_exists=True should not 
raise TableAlreadyExistException")
+
+        # test drop table
+        self.rest_catalog.drop_table("db1.tbl1", False)
+        with self.assertRaises(TableNotExistException) as context:
+            self.rest_catalog.drop_table("db1.tbl1", False)
+        self.assertEqual("db1.tbl1", 
context.exception.identifier.get_full_name())
+
+        try:
+            self.rest_catalog.drop_table("db1.tbl1", True)
+        except TableNotExistException:
+            self.fail("drop_table with ignore_if_exists=True should not raise 
TableNotExistException")
+
+        # test drop database
+        self.rest_catalog.drop_database("db1", False)
+        with self.assertRaises(DatabaseNotExistException) as context:
+            self.rest_catalog.drop_database("db1", False)
+        self.assertEqual("db1", context.exception.database)
+
+        try:
+            self.rest_catalog.drop_database("db1", True)
+        except DatabaseNotExistException:
+            self.fail("drop_database with ignore_if_exists=True should not 
raise DatabaseNotExistException")
diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py 
b/paimon-python/pypaimon/tests/rest/rest_server.py
index 7290863eb9..8f7cf23944 100644
--- a/paimon-python/pypaimon/tests/rest/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest/rest_server.py
@@ -38,7 +38,8 @@ from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.catalog_exception import (DatabaseNoPermissionException,
                                                 DatabaseNotExistException,
                                                 TableNoPermissionException,
-                                                TableNotExistException)
+                                                TableNotExistException, 
DatabaseAlreadyExistException,
+                                                TableAlreadyExistException)
 from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
 from pypaimon.common.json_util import JSON
@@ -321,6 +322,16 @@ class RESTCatalogServer:
                 ErrorResponse.RESOURCE_TYPE_TABLE, 
e.identifier.get_table_name(), str(e), 403
             )
             return self._mock_response(response, 403)
+        except DatabaseAlreadyExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_DATABASE, e.database, str(e), 409
+            )
+            return self._mock_response(response, 409)
+        except TableAlreadyExistException as e:
+            response = ErrorResponse(
+                ErrorResponse.RESOURCE_TYPE_TABLE, 
e.identifier.get_full_name(), str(e), 409
+            )
+            return self._mock_response(response, 409)
         except Exception as e:
             self.logger.error(f"Unexpected error: {e}")
             response = ErrorResponse(None, None, str(e), 500)
@@ -377,6 +388,8 @@ class RESTCatalogServer:
             return self._generate_final_list_databases_response(parameters, 
databases)
         if method == "POST":
             create_database = JSON.from_json(data, CreateDatabaseRequest)
+            if create_database.name in self.database_store:
+                raise DatabaseAlreadyExistException(create_database.name)
             self.database_store.update({
                 create_database.name: self.mock_database(create_database.name, 
create_database.options)
             })
@@ -412,6 +425,8 @@ class RESTCatalogServer:
                 return self._generate_final_list_tables_response(parameters, 
tables)
             elif method == "POST":
                 create_table = JSON.from_json(data, CreateTableRequest)
+                if create_table.identifier.get_full_name() in 
self.table_metadata_store:
+                    raise TableAlreadyExistException(create_table.identifier)
                 table_metadata = self._create_table_metadata(
                     create_table.identifier, 1, create_table.schema, 
str(uuid.uuid4()), False
                 )
@@ -441,7 +456,9 @@ class RESTCatalogServer:
 
         elif method == "DELETE":
             # Drop table
-            if identifier.get_full_name() in self.table_metadata_store:
+            if identifier.get_full_name() not in self.table_metadata_store:
+                raise TableNotExistException(identifier)
+            else:
                 del self.table_metadata_store[identifier.get_full_name()]
             if identifier.get_full_name() in self.table_latest_snapshot_store:
                 del 
self.table_latest_snapshot_store[identifier.get_full_name()]
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py 
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index 03685c317a..1a53b825d4 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -21,6 +21,8 @@ import os
 import pyarrow as pa
 
 from pypaimon import Schema
+from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, 
TableAlreadyExistException, \
+    DatabaseNotExistException, TableNotExistException
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor, 
DynamicBucketRowKeyExtractor
 
@@ -558,3 +560,56 @@ class RESTSimpleTest(RESTBaseTest):
         splits = read_builder.new_scan().plan().splits()
         actual = table_read.to_arrow(splits)
         self.assertTrue(not actual)
+
+    def test_create_drop_database_table(self):
+        # test create database
+        self.rest_catalog.create_database("db1", False)
+
+        with self.assertRaises(DatabaseAlreadyExistException) as context:
+            self.rest_catalog.create_database("db1", False)
+
+        self.assertEqual("db1", context.exception.database)
+
+        try:
+            self.rest_catalog.create_database("db1", True)
+        except DatabaseAlreadyExistException:
+            self.fail("create_database with ignore_if_exists=True should not 
raise DatabaseAlreadyExistException")
+
+        # test create table
+        self.rest_catalog.create_table("db1.tbl1",
+                                       
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                       False)
+        with self.assertRaises(TableAlreadyExistException) as context:
+            self.rest_catalog.create_table("db1.tbl1",
+                                           
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                           False)
+        self.assertEqual("db1.tbl1", 
context.exception.identifier.get_full_name())
+
+        try:
+            self.rest_catalog.create_table("db1.tbl1",
+                                           
Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']),
+                                           True)
+        except TableAlreadyExistException:
+            self.fail("create_table with ignore_if_exists=True should not 
raise TableAlreadyExistException")
+
+        # test drop table
+        self.rest_catalog.drop_table("db1.tbl1", False)
+        with self.assertRaises(TableNotExistException) as context:
+            self.rest_catalog.drop_table("db1.tbl1", False)
+        self.assertEqual("db1.tbl1", 
context.exception.identifier.get_full_name())
+
+        try:
+            self.rest_catalog.drop_table("db1.tbl1", True)
+        except TableNotExistException:
+            self.fail("drop_table with ignore_if_exists=True should not raise 
TableNotExistException")
+
+        # test drop database
+        self.rest_catalog.drop_database("db1", False)
+        with self.assertRaises(DatabaseNotExistException) as context:
+            self.rest_catalog.drop_database("db1", False)
+        self.assertEqual("db1", context.exception.database)
+
+        try:
+            self.rest_catalog.drop_database("db1", True)
+        except DatabaseNotExistException:
+            self.fail("drop_database with ignore_if_exists=True should not 
raise DatabaseNotExistException")
diff --git a/paimon-python/pypaimon/tests/serializable_test.py 
b/paimon-python/pypaimon/tests/serializable_test.py
index eed351e330..2081270120 100644
--- a/paimon-python/pypaimon/tests/serializable_test.py
+++ b/paimon-python/pypaimon/tests/serializable_test.py
@@ -54,7 +54,7 @@ class SerializableTest(unittest.TestCase):
     def tearDownClass(cls):
         shutil.rmtree(cls.tempdir, ignore_errors=True)
 
-    def testPickleSerializable(self):
+    def test_pickle_serializable(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema,
                                             partition_keys=['dt'],
                                             primary_keys=['user_id', 'dt'],

Reply via email to