This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.1 by this push:
     new fe65ee85cd [#9490] improvement(core): Include lakehouse-generic 
catalogs in managed entities for proper drop behavior (#9582)
fe65ee85cd is described below

commit fe65ee85cdf7404ee10116deb7b6ccbf0bca6a3e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 13:54:43 2025 +0800

    [#9490] improvement(core): Include lakehouse-generic catalogs in managed 
entities for proper drop behavior (#9582)
    
    ### What changes were proposed in this pull request?
    
    This PR improves the drop behavior for lakehouse-generic catalogs to
    properly handle managed entities (Lance tables).
    
    **Changes:**
    - Modified `CatalogManager.includeManagedEntities()` to include
    RELATIONAL catalogs with provider 'lakehouse-generic'
    - Added comprehensive integration test to verify drop behavior for both
    managed and external tables
    
    ### Why are the changes needed?
    
    Fix #9490
    
    Currently, when dropping a lakehouse-generic catalog, only metadata is
    deleted but the physical data (Lance tables managed by Gravitino)
    remains. This PR ensures that:
    - When dropping a managed catalog, both metadata and physical data are
    properly cleaned up
    - Managed tables' physical directories are deleted when catalog is
    dropped
    - External tables' physical directories are preserved (not deleted)
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. After this change, dropping a lakehouse-generic catalog with
    `force=true` will now also delete the physical data for managed Lance
    tables, not just the metadata.
    
    ### How was this patch tested?
    
    - Added integration test
    `testDropCatalogWithManagedAndExternalEntities()` that verifies:
    - Managed tables are properly deleted (both metadata and physical data)
      - External tables' physical data is preserved
    - Test passed successfully in multiple runs
    
    Co-authored-by: Jerry Shao <[email protected]>
---
 .../catalog/fileset/FilesetCatalogCapability.java  |   5 +-
 .../generic/GenericCatalogCapability.java          |   6 +-
 .../test/CatalogGenericCatalogLanceIT.java         | 117 ++++++++++++++++++++-
 .../catalog/model/ModelCatalogCapability.java      |   5 +-
 .../apache/gravitino/catalog/CatalogManager.java   |  68 +++++++-----
 .../gravitino/connector/capability/Capability.java |   5 +-
 .../gravitino/catalog/TestCatalogManager.java      |   6 ++
 7 files changed, 175 insertions(+), 37 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
index ef9e18a514..6c76b0f1a7 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogCapability.java
@@ -18,14 +18,15 @@
  */
 package org.apache.gravitino.catalog.fileset;
 
-import java.util.Objects;
+import com.google.common.base.Preconditions;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.connector.capability.CapabilityResult;
 
 public class FilesetCatalogCapability implements Capability {
   @Override
   public CapabilityResult managedStorage(Scope scope) {
-    if (Objects.requireNonNull(scope) == Scope.SCHEMA) {
+    Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+    if (scope == Scope.SCHEMA || scope == Scope.FILESET) {
       return CapabilityResult.SUPPORTED;
     }
     return CapabilityResult.unsupported(
diff --git 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
index 5900d0b51e..2a6bce6b79 100644
--- 
a/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/generic/GenericCatalogCapability.java
@@ -18,7 +18,7 @@
  */
 package org.apache.gravitino.catalog.lakehouse.generic;
 
-import java.util.Objects;
+import com.google.common.base.Preconditions;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.connector.capability.CapabilityResult;
 
@@ -26,8 +26,8 @@ public class GenericCatalogCapability implements Capability {
 
   @Override
   public CapabilityResult managedStorage(Scope scope) {
-    if (Objects.requireNonNull(scope) == Scope.TABLE
-        || Objects.requireNonNull(scope) == Scope.SCHEMA) {
+    Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+    if (scope == Scope.TABLE || scope == Scope.SCHEMA) {
       return CapabilityResult.SUPPORTED;
     }
 
diff --git 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
index fae235818b..d3a34193a7 100644
--- 
a/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
+++ 
b/catalogs/catalog-lakehouse-generic/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/integration/test/CatalogGenericCatalogLanceIT.java
@@ -53,10 +53,12 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.io.FileUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.rel.Column;
@@ -114,8 +116,6 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
     // Create a temp directory for test use
     Path tempDir = Files.createTempDirectory("myTempDir");
     tempDirectory = tempDir.toString();
-    File file = new File(tempDirectory);
-    file.deleteOnExit();
   }
 
   @AfterAll
@@ -141,6 +141,8 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
     }
 
     client = null;
+
+    FileUtils.deleteDirectory(new File(tempDirectory));
   }
 
   @AfterEach
@@ -934,4 +936,115 @@ public class CatalogGenericCatalogLanceIT extends BaseIT {
     boolean dropSuccess = 
catalog.asTableCatalog().dropTable(newTableIdentifier);
     Assertions.assertTrue(dropSuccess);
   }
+
+  @Test
+  void testDropCatalogWithManagedAndExternalEntities() {
+    // Create a new catalog for this test to avoid interfering with other tests
+    String testCatalogName = 
GravitinoITUtils.genRandomName("drop_catalog_test");
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    metalake.createCatalog(
+        testCatalogName,
+        Catalog.Type.RELATIONAL,
+        provider,
+        "Test catalog for drop",
+        catalogProperties);
+
+    Catalog testCatalog = metalake.loadCatalog(testCatalogName);
+
+    // Create a schema
+    String testSchemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
+    Map<String, String> schemaProperties = createSchemaProperties();
+    testCatalog.asSchemas().createSchema(testSchemaName, "Test schema", 
schemaProperties);
+
+    Column[] columns = createColumns();
+
+    // Create a managed (non-external) Lance table
+    String managedTableName = GravitinoITUtils.genRandomName(TABLE_PREFIX + 
"_managed");
+    NameIdentifier managedTableIdentifier = NameIdentifier.of(testSchemaName, 
managedTableName);
+    String managedTableLocation =
+        String.format("%s/%s/%s", tempDirectory, testSchemaName, 
managedTableName);
+
+    Map<String, String> managedTableProperties = createProperties();
+    managedTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+    managedTableProperties.put(Table.PROPERTY_LOCATION, managedTableLocation);
+
+    Table managedTable =
+        testCatalog
+            .asTableCatalog()
+            .createTable(
+                managedTableIdentifier,
+                columns,
+                "Managed table",
+                managedTableProperties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertNotNull(managedTable);
+    File managedTableDir = new File(managedTableLocation);
+    Assertions.assertTrue(
+        managedTableDir.exists(), "Managed table directory should exist after 
creation");
+
+    // Create an external Lance table
+    String externalTableName = GravitinoITUtils.genRandomName(TABLE_PREFIX + 
"_external");
+    NameIdentifier externalTableIdentifier = NameIdentifier.of(testSchemaName, 
externalTableName);
+    String externalTableLocation =
+        String.format("%s/%s/%s", tempDirectory, testSchemaName, 
externalTableName);
+
+    Map<String, String> externalTableProperties = createProperties();
+    externalTableProperties.put(Table.PROPERTY_TABLE_FORMAT, "lance");
+    externalTableProperties.put(Table.PROPERTY_LOCATION, 
externalTableLocation);
+    externalTableProperties.put(Table.PROPERTY_EXTERNAL, "true");
+
+    Table externalTable =
+        testCatalog
+            .asTableCatalog()
+            .createTable(
+                externalTableIdentifier,
+                columns,
+                "External table",
+                externalTableProperties,
+                Transforms.EMPTY_TRANSFORM,
+                null,
+                null);
+
+    Assertions.assertNotNull(externalTable);
+    File externalTableDir = new File(externalTableLocation);
+    Assertions.assertTrue(
+        externalTableDir.exists(), "External table directory should exist 
after creation");
+
+    // Verify both tables exist in catalog
+    Table loadedManagedTable = 
testCatalog.asTableCatalog().loadTable(managedTableIdentifier);
+    Assertions.assertNotNull(loadedManagedTable);
+
+    Table loadedExternalTable = 
testCatalog.asTableCatalog().loadTable(externalTableIdentifier);
+    Assertions.assertNotNull(loadedExternalTable);
+
+    // Drop the catalog with force=true
+    boolean catalogDropped = metalake.dropCatalog(testCatalogName, true);
+    Assertions.assertTrue(catalogDropped, "Catalog should be dropped 
successfully");
+
+    // Verify the catalog is dropped
+    Assertions.assertThrows(
+        NoSuchCatalogException.class,
+        () -> metalake.loadCatalog(testCatalogName),
+        "Catalog should not exist after drop");
+
+    // Verify the managed table's physical directory is removed
+    Assertions.assertFalse(
+        managedTableDir.exists(),
+        "Managed table directory should be removed after dropping catalog");
+
+    // Verify the external table's physical directory is preserved
+    Assertions.assertTrue(
+        externalTableDir.exists(),
+        "External table directory should be preserved after dropping catalog");
+
+    // Clean up external table directory manually
+    try {
+      FileUtils.deleteDirectory(externalTableDir);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete external table directory: {}", 
externalTableLocation, e);
+    }
+  }
 }
diff --git 
a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
 
b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
index b19c9ba2e6..4a930e33bd 100644
--- 
a/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
+++ 
b/catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogCapability.java
@@ -18,14 +18,15 @@
  */
 package org.apache.gravitino.catalog.model;
 
-import java.util.Objects;
+import com.google.common.base.Preconditions;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.connector.capability.CapabilityResult;
 
 public class ModelCatalogCapability implements Capability {
   @Override
   public CapabilityResult managedStorage(Scope scope) {
-    if (Objects.requireNonNull(scope) == Scope.SCHEMA) {
+    Preconditions.checkArgument(scope != null, "Scope cannot be null.");
+    if (scope == Scope.SCHEMA || scope == Scope.MODEL) {
       return CapabilityResult.SUPPORTED;
     }
     return CapabilityResult.unsupported(
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 17c0e97701..6835d5ac28 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -103,6 +103,7 @@ import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.IsolatedClassLoader;
+import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.gravitino.utils.ThrowableFunction;
 import org.slf4j.Logger;
@@ -740,36 +741,31 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
         () -> {
           checkMetalake(metalakeIdent, store);
           try {
-            boolean catalogInUse = catalogInUse(store, ident);
+            boolean catalogInUse = getCatalogInUseValue(store, ident);
             if (catalogInUse && !force) {
               throw new CatalogInUseException(
                   "Catalog %s is in use, please disable it first or use force 
option", ident);
             }
 
-            Namespace schemaNamespace = 
Namespace.of(ident.namespace().level(0), ident.name());
-            CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
-
+            Namespace schemaNs = Namespace.of(ident.namespace().level(0), 
ident.name());
             List<SchemaEntity> schemaEntities =
-                store.list(schemaNamespace, SchemaEntity.class, 
EntityType.SCHEMA);
-            CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+                store.list(schemaNs, SchemaEntity.class, EntityType.SCHEMA);
 
-            if (!force
-                && containsUserCreatedSchemas(schemaEntities, catalogEntity, 
catalogWrapper)) {
+            CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
+            if (!force && containsUserCreatedSchemas(schemaEntities, 
catalogWrapper)) {
               throw new NonEmptyCatalogException(
                   "Catalog %s has schemas, please drop them first or use force 
option", ident);
             }
 
-            if (includeManagedEntities(catalogEntity)) {
-              // code reach here in two cases:
-              // 1. the catalog does not have available schemas
-              // 2. the catalog has available schemas, and force is true
-              // for case 1, the forEach block can drop them without any side 
effect
-              // for case 2, the forEach block will drop all managed 
sub-entities
+            if (isManagedStorageCatalog(catalogWrapper)) {
+              // For managed catalog, we need to call drop schema API to drop 
the underlying
+              // entities as well as the related resource first. Directly 
deleting the metadata from
+              // the store is not enough.
               schemaEntities.forEach(
                   schema -> {
                     try {
                       catalogWrapper.doWithSchemaOps(
-                          schemaOps -> 
schemaOps.dropSchema(schema.nameIdentifier(), true));
+                          ops -> ops.dropSchema(schema.nameIdentifier(), 
true));
                     } catch (Exception e) {
                       LOG.warn("Failed to drop schema {}", 
schema.nameIdentifier());
                       throw new RuntimeException(
@@ -777,6 +773,8 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
                     }
                   });
             }
+
+            // Finally, delete the catalog entity as well as all its 
sub-entities from the store.
             catalogCache.invalidate(ident);
             return store.delete(ident, EntityType.CATALOG, true);
 
@@ -805,28 +803,33 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
    * </ul>
    *
    * @param schemaEntities The list of schema entities to check.
-   * @param catalogEntity The catalog entity to which the schemas belong.
    * @param catalogWrapper The catalog wrapper for the catalog.
    * @return True if the list of schema entities contains any valid 
user-created schemas, false
    *     otherwise.
    * @throws Exception If an error occurs while checking the schemas.
    */
   private boolean containsUserCreatedSchemas(
-      List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity, 
CatalogWrapper catalogWrapper)
-      throws Exception {
+      List<SchemaEntity> schemaEntities, CatalogWrapper catalogWrapper) throws 
Exception {
     if (schemaEntities.isEmpty()) {
       return false;
     }
 
+    if (isManagedStorageCatalog(catalogWrapper)) {
+      // For managed storage catalog, any existing schema entities are 
considered user-created. At
+      // this point we already know schemaEntities is not empty, so we can 
return true directly
+      // without further checks.
+      return true;
+    }
+
     if (schemaEntities.size() == 1) {
-      if ("kafka".equals(catalogEntity.getProvider())) {
+      String provider = catalogWrapper.catalog().provider();
+      if ("kafka".equalsIgnoreCase(provider)) {
         return false;
-
-      } else if ("jdbc-postgresql".equals(catalogEntity.getProvider())) {
+      } else if ("jdbc-postgresql".equalsIgnoreCase(provider)) {
         // PostgreSQL catalog includes the "public" schema, see
         // https://github.com/apache/gravitino/issues/2314
         return !schemaEntities.get(0).name().equals("public");
-      } else if ("hive".equals(catalogEntity.getProvider())) {
+      } else if ("hive".equalsIgnoreCase(provider)) {
         return !schemaEntities.get(0).name().equals("default");
       }
     }
@@ -835,7 +838,9 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
         catalogWrapper.doWithSchemaOps(
             schemaOps ->
                 schemaOps.listSchemas(
-                    Namespace.of(catalogEntity.namespace().level(0), 
catalogEntity.name())));
+                    NamespaceUtil.ofSchema(
+                        catalogWrapper.catalog().entity().namespace().level(0),
+                        catalogWrapper.catalog().name())));
     if (allSchemas.length == 0) {
       return false;
     }
@@ -848,10 +853,6 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     return 
schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
   }
 
-  private boolean includeManagedEntities(CatalogEntity catalogEntity) {
-    return catalogEntity.getType().equals(FILESET);
-  }
-
   /**
    * Loads the catalog with the specified identifier, wraps it in a 
CatalogWrapper, and caches the
    * wrapper for reuse.
@@ -893,6 +894,19 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     }
   }
 
+  private boolean isManagedStorageCatalog(CatalogWrapper catalogWrapper) {
+    try {
+      Capability capability = catalogWrapper.capabilities();
+      return capability.managedStorage(Capability.Scope.SCHEMA).supported()
+          && (capability.managedStorage(Capability.Scope.TABLE).supported()
+              || 
capability.managedStorage(Capability.Scope.FILESET).supported()
+              || 
capability.managedStorage(Capability.Scope.MODEL).supported());
+    } catch (Exception e) {
+      // This should not be happened, because capabilities() will never throw 
an exception here.
+      throw new RuntimeException(e);
+    }
+  }
+
   private CatalogEntity.Builder newCatalogBuilder(Namespace namespace, 
CatalogEntity catalog) {
     CatalogEntity.Builder builder =
         CatalogEntity.builder()
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java 
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
index 37d517f2d2..3b33b8f3e2 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/capability/Capability.java
@@ -83,7 +83,10 @@ public interface Capability {
   }
 
   /**
-   * Check if the entity is fully managed by Gravitino in the scope.
+   * Check if the metadata entity is fully managed by Gravitino for the 
passed-in scope. This is
+   * used to determine whether Gravitino should manage the lifecycle of the 
metadata and related
+   * data. For example, if a catalog storage is managed, when the catalog is 
dropped, Gravitino will
+   * also delete all underlying data associated with the catalog.
    *
    * @param scope The scope of the capability.
    * @return The capability of the managed storage.
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index b9cede7473..8a4f728af2 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -43,6 +43,8 @@ import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.connector.capability.CapabilityResult;
 import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.CatalogInUseException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
@@ -598,7 +600,11 @@ public class TestCatalogManager {
     entityStore.put(schemaEntity);
     CatalogManager.CatalogWrapper catalogWrapper =
         Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Capability capability = Mockito.mock(Capability.class);
+    CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not 
managed");
     
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
+    Mockito.doReturn(capability).when(catalogWrapper).capabilities();
+    Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
     Mockito.doThrow(new RuntimeException("Failed connect"))
         .when(catalogWrapper)
         .doWithSchemaOps(any());

Reply via email to