This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new fce1bd9bb [#5760][#5780] fix(catalog): fix drop catalog error (#5761)
fce1bd9bb is described below
commit fce1bd9bba9ac1615d73313f598c9204a1c129a9
Author: mchades <[email protected]>
AuthorDate: Mon Dec 16 18:54:51 2024 +0800
[#5760][#5780] fix(catalog): fix drop catalog error (#5761)
### What changes were proposed in this pull request?
before drop the catalog, check all schemas are avaliable
### Why are the changes needed?
some schemas are dropped externally, but still exist in the entity
store, those schemas are invalid
Fix: #5760
Fix: #5780
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
CI pass
---
.../mysql/integration/test/CatalogMysqlIT.java | 32 ++++++--
.../apache/gravitino/catalog/CatalogManager.java | 87 ++++++++++++++++++----
2 files changed, 99 insertions(+), 20 deletions(-)
diff --git
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 4a0fe241e..c6c334766 100644
---
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -47,6 +47,7 @@ import
org.apache.gravitino.catalog.mysql.integration.test.service.MysqlService;
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
@@ -136,8 +137,8 @@ public class CatalogMysqlIT extends BaseIT {
mysqlService = new MysqlService(MYSQL_CONTAINER, TEST_DB_NAME);
createMetalake();
- createCatalog();
- createSchema();
+ catalog = createCatalog(catalogName);
+ createSchema(catalog, schemaName);
}
@AfterAll
@@ -153,7 +154,7 @@ public class CatalogMysqlIT extends BaseIT {
@AfterEach
public void resetSchema() {
clearTableAndSchema();
- createSchema();
+ createSchema(catalog, schemaName);
}
private void clearTableAndSchema() {
@@ -176,7 +177,7 @@ public class CatalogMysqlIT extends BaseIT {
metalake = loadMetalake;
}
- private void createCatalog() throws SQLException {
+ private Catalog createCatalog(String catalogName) throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(
@@ -196,10 +197,10 @@ public class CatalogMysqlIT extends BaseIT {
Catalog loadCatalog = metalake.loadCatalog(catalogName);
Assertions.assertEquals(createdCatalog, loadCatalog);
- catalog = loadCatalog;
+ return loadCatalog;
}
- private void createSchema() {
+ private void createSchema(Catalog catalog, String schemaName) {
Map<String, String> prop = Maps.newHashMap();
Schema createdSchema = catalog.asSchemas().createSchema(schemaName,
schema_comment, prop);
@@ -257,6 +258,25 @@ public class CatalogMysqlIT extends BaseIT {
return properties;
}
+ @Test
+ void testDropCatalog() throws SQLException {
+ // test drop catalog with legacy entity
+ String catalogName = GravitinoITUtils.genRandomName("drop_catalog_it");
+ Catalog catalog = createCatalog(catalogName);
+ String schemaName = GravitinoITUtils.genRandomName("drop_catalog_it");
+ createSchema(catalog, schemaName);
+
+ metalake.disableCatalog(catalogName);
+ Assertions.assertThrows(
+ NonEmptyCatalogException.class, () ->
metalake.dropCatalog(catalogName));
+
+ // drop database externally
+ String sql = String.format("DROP DATABASE %s", schemaName);
+ mysqlService.executeQuery(sql);
+
+ Assertions.assertTrue(metalake.dropCatalog(catalogName));
+ }
+
@Test
void testTestConnection() throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 6f77bb462..da79ff702 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -643,24 +643,25 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
"Catalog %s is in use, please disable it first or use force
option", ident);
}
- List<SchemaEntity> schemas =
- store.list(
- Namespace.of(ident.namespace().level(0), ident.name()),
- SchemaEntity.class,
- EntityType.SCHEMA);
+ Namespace schemaNamespace = Namespace.of(ident.namespace().level(0),
ident.name());
+ CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
+
+ List<SchemaEntity> schemaEntities =
+ store.list(schemaNamespace, SchemaEntity.class, EntityType.SCHEMA);
CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG,
CatalogEntity.class);
- if (!schemas.isEmpty() && !force) {
- // the Kafka catalog is special, it includes a default schema
- if (!catalogEntity.getProvider().equals("kafka") || schemas.size() >
1) {
- throw new NonEmptyCatalogException(
- "Catalog %s has schemas, please drop them first or use force
option", ident);
- }
+ if (containsUserCreatedSchemas(schemaEntities, catalogEntity,
catalogWrapper) && !force) {
+ throw new NonEmptyCatalogException(
+ "Catalog %s has schemas, please drop them first or use force
option", ident);
}
- CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (includeManagedEntities(catalogEntity)) {
- schemas.forEach(
+ // code reach here in two cases:
+ // 1. the catalog does not have available schemas
+ // 2. the catalog has available schemas, and force is true
+ // for case 1, the forEach block can drop them without any side effect
+ // for case 2, the forEach block will drop all managed sub-entities
+ schemaEntities.forEach(
schema -> {
try {
catalogWrapper.doWithSchemaOps(
@@ -677,11 +678,69 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
} catch (NoSuchMetalakeException | NoSuchCatalogException ignored) {
return false;
- } catch (IOException e) {
+ } catch (GravitinoRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
+ /**
+ * Check if the given list of schema entities contains any currently
existing user-created
+ * schemas.
+ *
+ * <p>This method determines if there are valid user-created schemas by
comparing the provided
+ * schema entities with the actual schemas currently existing in the
external data source. It
+ * excludes:
+ *
+ * <ul>
+ * <li>1. Automatically generated schemas (such as Kafka catalog's
"default" schema or
+ * JDBC-PostgreSQL catalog's "public" schema).
+ * <li>2. Schemas that have been dropped externally but still exist in the
entity store.
+ * </ul>
+ *
+ * @param schemaEntities The list of schema entities to check.
+ * @param catalogEntity The catalog entity to which the schemas belong.
+ * @param catalogWrapper The catalog wrapper for the catalog.
+ * @return True if the list of schema entities contains any valid
user-created schemas, false
+ * otherwise.
+ * @throws Exception If an error occurs while checking the schemas.
+ */
+ private boolean containsUserCreatedSchemas(
+ List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity,
CatalogWrapper catalogWrapper)
+ throws Exception {
+ if (schemaEntities.isEmpty()) {
+ return false;
+ }
+
+ if (schemaEntities.size() == 1) {
+ if ("kafka".equals(catalogEntity.getProvider())) {
+ return false;
+
+ } else if ("jdbc-postgresql".equals(catalogEntity.getProvider())) {
+ // PostgreSQL catalog includes the "public" schema, see
+ // https://github.com/apache/gravitino/issues/2314
+ return !schemaEntities.get(0).name().equals("public");
+ }
+ }
+
+ NameIdentifier[] allSchemas =
+ catalogWrapper.doWithSchemaOps(
+ schemaOps ->
+ schemaOps.listSchemas(
+ Namespace.of(catalogEntity.namespace().level(0),
catalogEntity.name())));
+ if (allSchemas.length == 0) {
+ return false;
+ }
+
+ Set<String> availableSchemaNames =
+
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
+
+ // some schemas are dropped externally, but still exist in the entity
store, those schemas are
+ // invalid
+ return
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
+ }
+
private boolean includeManagedEntities(CatalogEntity catalogEntity) {
return catalogEntity.getType().equals(FILESET);
}