This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fce1bd9bb [#5760][#5780] fix(catalog): fix drop catalog error (#5761)
fce1bd9bb is described below

commit fce1bd9bba9ac1615d73313f598c9204a1c129a9
Author: mchades <[email protected]>
AuthorDate: Mon Dec 16 18:54:51 2024 +0800

    [#5760][#5780] fix(catalog): fix drop catalog error (#5761)
    
    ### What changes were proposed in this pull request?
    
     before drop the catalog, check all schemas are avaliable
    
    ### Why are the changes needed?
    
    some schemas are dropped externally, but still exist in the entity
    store, those schemas are invalid
    
    Fix: #5760
    Fix: #5780
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    CI pass
---
 .../mysql/integration/test/CatalogMysqlIT.java     | 32 ++++++--
 .../apache/gravitino/catalog/CatalogManager.java   | 87 ++++++++++++++++++----
 2 files changed, 99 insertions(+), 20 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 4a0fe241e..c6c334766 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -47,6 +47,7 @@ import 
org.apache.gravitino.catalog.mysql.integration.test.service.MysqlService;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptyCatalogException;
 import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
@@ -136,8 +137,8 @@ public class CatalogMysqlIT extends BaseIT {
 
     mysqlService = new MysqlService(MYSQL_CONTAINER, TEST_DB_NAME);
     createMetalake();
-    createCatalog();
-    createSchema();
+    catalog = createCatalog(catalogName);
+    createSchema(catalog, schemaName);
   }
 
   @AfterAll
@@ -153,7 +154,7 @@ public class CatalogMysqlIT extends BaseIT {
   @AfterEach
   public void resetSchema() {
     clearTableAndSchema();
-    createSchema();
+    createSchema(catalog, schemaName);
   }
 
   private void clearTableAndSchema() {
@@ -176,7 +177,7 @@ public class CatalogMysqlIT extends BaseIT {
     metalake = loadMetalake;
   }
 
-  private void createCatalog() throws SQLException {
+  private Catalog createCatalog(String catalogName) throws SQLException {
     Map<String, String> catalogProperties = Maps.newHashMap();
 
     catalogProperties.put(
@@ -196,10 +197,10 @@ public class CatalogMysqlIT extends BaseIT {
     Catalog loadCatalog = metalake.loadCatalog(catalogName);
     Assertions.assertEquals(createdCatalog, loadCatalog);
 
-    catalog = loadCatalog;
+    return loadCatalog;
   }
 
-  private void createSchema() {
+  private void createSchema(Catalog catalog, String schemaName) {
     Map<String, String> prop = Maps.newHashMap();
 
     Schema createdSchema = catalog.asSchemas().createSchema(schemaName, 
schema_comment, prop);
@@ -257,6 +258,25 @@ public class CatalogMysqlIT extends BaseIT {
     return properties;
   }
 
+  @Test
+  void testDropCatalog() throws SQLException {
+    // test drop catalog with legacy entity
+    String catalogName = GravitinoITUtils.genRandomName("drop_catalog_it");
+    Catalog catalog = createCatalog(catalogName);
+    String schemaName = GravitinoITUtils.genRandomName("drop_catalog_it");
+    createSchema(catalog, schemaName);
+
+    metalake.disableCatalog(catalogName);
+    Assertions.assertThrows(
+        NonEmptyCatalogException.class, () -> 
metalake.dropCatalog(catalogName));
+
+    // drop database externally
+    String sql = String.format("DROP DATABASE %s", schemaName);
+    mysqlService.executeQuery(sql);
+
+    Assertions.assertTrue(metalake.dropCatalog(catalogName));
+  }
+
   @Test
   void testTestConnection() throws SQLException {
     Map<String, String> catalogProperties = Maps.newHashMap();
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 6f77bb462..da79ff702 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -643,24 +643,25 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
             "Catalog %s is in use, please disable it first or use force 
option", ident);
       }
 
-      List<SchemaEntity> schemas =
-          store.list(
-              Namespace.of(ident.namespace().level(0), ident.name()),
-              SchemaEntity.class,
-              EntityType.SCHEMA);
+      Namespace schemaNamespace = Namespace.of(ident.namespace().level(0), 
ident.name());
+      CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
+
+      List<SchemaEntity> schemaEntities =
+          store.list(schemaNamespace, SchemaEntity.class, EntityType.SCHEMA);
       CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
 
-      if (!schemas.isEmpty() && !force) {
-        // the Kafka catalog is special, it includes a default schema
-        if (!catalogEntity.getProvider().equals("kafka") || schemas.size() > 
1) {
-          throw new NonEmptyCatalogException(
-              "Catalog %s has schemas, please drop them first or use force 
option", ident);
-        }
+      if (containsUserCreatedSchemas(schemaEntities, catalogEntity, 
catalogWrapper) && !force) {
+        throw new NonEmptyCatalogException(
+            "Catalog %s has schemas, please drop them first or use force 
option", ident);
       }
 
-      CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
       if (includeManagedEntities(catalogEntity)) {
-        schemas.forEach(
+        // code reach here in two cases:
+        // 1. the catalog does not have available schemas
+        // 2. the catalog has available schemas, and force is true
+        // for case 1, the forEach block can drop them without any side effect
+        // for case 2, the forEach block will drop all managed sub-entities
+        schemaEntities.forEach(
             schema -> {
               try {
                 catalogWrapper.doWithSchemaOps(
@@ -677,11 +678,69 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     } catch (NoSuchMetalakeException | NoSuchCatalogException ignored) {
       return false;
 
-    } catch (IOException e) {
+    } catch (GravitinoRuntimeException e) {
+      throw e;
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  /**
+   * Check if the given list of schema entities contains any currently 
existing user-created
+   * schemas.
+   *
+   * <p>This method determines if there are valid user-created schemas by 
comparing the provided
+   * schema entities with the actual schemas currently existing in the 
external data source. It
+   * excludes:
+   *
+   * <ul>
+   *   <li>1. Automatically generated schemas (such as Kafka catalog's 
"default" schema or
+   *       JDBC-PostgreSQL catalog's "public" schema).
+   *   <li>2. Schemas that have been dropped externally but still exist in the 
entity store.
+   * </ul>
+   *
+   * @param schemaEntities The list of schema entities to check.
+   * @param catalogEntity The catalog entity to which the schemas belong.
+   * @param catalogWrapper The catalog wrapper for the catalog.
+   * @return True if the list of schema entities contains any valid 
user-created schemas, false
+   *     otherwise.
+   * @throws Exception If an error occurs while checking the schemas.
+   */
+  private boolean containsUserCreatedSchemas(
+      List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity, 
CatalogWrapper catalogWrapper)
+      throws Exception {
+    if (schemaEntities.isEmpty()) {
+      return false;
+    }
+
+    if (schemaEntities.size() == 1) {
+      if ("kafka".equals(catalogEntity.getProvider())) {
+        return false;
+
+      } else if ("jdbc-postgresql".equals(catalogEntity.getProvider())) {
+        // PostgreSQL catalog includes the "public" schema, see
+        // https://github.com/apache/gravitino/issues/2314
+        return !schemaEntities.get(0).name().equals("public");
+      }
+    }
+
+    NameIdentifier[] allSchemas =
+        catalogWrapper.doWithSchemaOps(
+            schemaOps ->
+                schemaOps.listSchemas(
+                    Namespace.of(catalogEntity.namespace().level(0), 
catalogEntity.name())));
+    if (allSchemas.length == 0) {
+      return false;
+    }
+
+    Set<String> availableSchemaNames =
+        
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
+
+    // some schemas are dropped externally, but still exist in the entity 
store, those schemas are
+    // invalid
+    return 
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
+  }
+
   private boolean includeManagedEntities(CatalogEntity catalogEntity) {
     return catalogEntity.getType().equals(FILESET);
   }

Reply via email to