This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 1420174bb [#4842] fix(hadoop-catalog): Improve the schema drop logic 
for fileset catalog (#5521)
1420174bb is described below

commit 1420174bb8c35913678db4e39809ddac8ef16b85
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Nov 12 10:57:48 2024 +0800

    [#4842] fix(hadoop-catalog): Improve the schema drop logic for fileset 
catalog (#5521)
    
    ### What changes were proposed in this pull request?
    
    This PR changes the schema drop mechanism to delete the fileset path one
    by one, instead of delete the schema path. This improvement will:
    
    1. Delete the fileset path that is not under the schema's location.
    2. Avoid deleting the external fileset path that happens to under schema
    location.
    
    ### Why are the changes needed?
    
    To make the drop mechanism correct.
    
    Fix: #4842
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UT to cover the changes.
---
 .../catalog/hadoop/HadoopCatalogOperations.java    | 78 +++++++++++++++++++---
 .../hadoop/TestHadoopCatalogOperations.java        | 31 ++++++---
 .../test/HadoopUserAuthenticationIT.java           |  3 +-
 docs/manage-fileset-metadata-using-gravitino.md    |  6 +-
 4 files changed, 97 insertions(+), 21 deletions(-)

diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 30e72c041..21775038b 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -22,6 +22,7 @@ import static 
org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -64,8 +65,10 @@ import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -89,7 +92,7 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
 
   private CatalogInfo catalogInfo;
 
-  private final Map<String, FileSystemProvider> fileSystemProvidersMap = 
Maps.newHashMap();
+  private Map<String, FileSystemProvider> fileSystemProvidersMap;
 
   private FileSystemProvider defaultFileSystemProvider;
 
@@ -133,7 +136,10 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
             propertiesMetadata
                 .catalogPropertiesMetadata()
                 .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS);
-    
this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+    this.fileSystemProvidersMap =
+        ImmutableMap.<String, FileSystemProvider>builder()
+            
.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders))
+            .build();
 
     String defaultFileSystemProviderName =
         (String)
@@ -581,31 +587,87 @@ public class HadoopCatalogOperations implements 
CatalogOperations, SupportsSchem
   @Override
   public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
     try {
+      Namespace filesetNs =
+          NamespaceUtil.ofFileset(
+              ident.namespace().level(0), // metalake name
+              ident.namespace().level(1), // catalog name
+              ident.name() // schema name
+              );
+
+      List<FilesetEntity> filesets =
+          store.list(filesetNs, FilesetEntity.class, 
Entity.EntityType.FILESET);
+      if (!filesets.isEmpty() && !cascade) {
+        throw new NonEmptySchemaException("Schema %s is not empty", ident);
+      }
+
+      // Delete all the managed filesets no matter whether the storage 
location is under the
+      // schema path or not.
+      // The reason why we delete the managed fileset's storage location one 
by one is because we
+      // may mis-delete the storage location of the external fileset if it 
happens to be under
+      // the schema path.
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      filesets
+          .parallelStream()
+          .filter(f -> f.filesetType() == Fileset.Type.MANAGED)
+          .forEach(
+              f -> {
+                ClassLoader oldCl = 
Thread.currentThread().getContextClassLoader();
+                try {
+                  // parallelStream uses forkjoin thread pool, which has a 
different classloader
+                  // than the catalog thread. We need to set the context 
classloader to the
+                  // catalog's classloader to avoid classloading issues.
+                  Thread.currentThread().setContextClassLoader(cl);
+                  Path filesetPath = new Path(f.storageLocation());
+                  FileSystem fs = getFileSystem(filesetPath, conf);
+                  if (fs.exists(filesetPath)) {
+                    if (!fs.delete(filesetPath, true)) {
+                      LOG.warn("Failed to delete fileset {} location {}", 
f.name(), filesetPath);
+                    }
+                  }
+                } catch (IOException ioe) {
+                  LOG.warn(
+                      "Failed to delete fileset {} location {}",
+                      f.name(),
+                      f.storageLocation(),
+                      ioe);
+                } finally {
+                  Thread.currentThread().setContextClassLoader(oldCl);
+                }
+              });
+
       SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, 
SchemaEntity.class);
       Map<String, String> properties =
           
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
 
+      // Delete the schema path if it exists and is empty.
       Path schemaPath = getSchemaPath(ident.name(), properties);
       // Nothing to delete if the schema path is not set.
       if (schemaPath == null) {
         return false;
       }
+
       FileSystem fs = getFileSystem(schemaPath, conf);
       // Nothing to delete if the schema path does not exist.
       if (!fs.exists(schemaPath)) {
         return false;
       }
 
-      if (fs.listStatus(schemaPath).length > 0 && !cascade) {
-        throw new NonEmptySchemaException(
-            "Schema %s with location %s is not empty", ident, schemaPath);
-      } else {
-        fs.delete(schemaPath, true);
+      FileStatus[] statuses = fs.listStatus(schemaPath);
+      if (statuses.length == 0) {
+        if (fs.delete(schemaPath, true)) {
+          LOG.info("Deleted schema {} location {}", ident, schemaPath);
+        } else {
+          LOG.warn("Failed to delete schema {} location {}", ident, 
schemaPath);
+          return false;
+        }
       }
 
-      LOG.info("Deleted schema {} location {}", ident, schemaPath);
+      LOG.info("Deleted schema {}", ident);
       return true;
 
+    } catch (NoSuchEntityException ne) {
+      LOG.warn("Schema {} does not exist", ident);
+      return false;
     } catch (IOException ioe) {
       throw new RuntimeException("Failed to delete schema " + ident + " 
location", ioe);
     }
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 9575a1313..3c1ea4ff0 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -443,23 +443,36 @@ public class TestHadoopCatalogOperations {
       Assertions.assertFalse(fs.exists(schemaPath));
 
       // Test drop non-empty schema with cascade = false
-      Path subPath = new Path(schemaPath, "test1");
-      fs.mkdirs(subPath);
-      Assertions.assertTrue(fs.exists(subPath));
+      Fileset fs1 = createFileset("fs1", name, "comment", 
Fileset.Type.MANAGED, catalogPath, null);
+      Path fs1Path = new Path(fs1.storageLocation());
 
       Throwable exception1 =
           Assertions.assertThrows(NonEmptySchemaException.class, () -> 
ops.dropSchema(id, false));
-      Assertions.assertEquals(
-          "Schema m1.c1.schema20 with location " + schemaPath + " is not 
empty",
-          exception1.getMessage());
+      Assertions.assertEquals("Schema m1.c1.schema20 is not empty", 
exception1.getMessage());
 
       // Test drop non-empty schema with cascade = true
       ops.dropSchema(id, true);
       Assertions.assertFalse(fs.exists(schemaPath));
+      Assertions.assertFalse(fs.exists(fs1Path));
+
+      // Test drop both managed and external filesets
+      Fileset fs2 = createFileset("fs2", name, "comment", 
Fileset.Type.MANAGED, catalogPath, null);
+      Path fs2Path = new Path(fs2.storageLocation());
+
+      Path fs3Path = new Path(schemaPath, "fs3");
+      createFileset("fs3", name, "comment", Fileset.Type.EXTERNAL, 
catalogPath, fs3Path.toString());
 
-      // Test drop empty schema
-      Assertions.assertFalse(ops.dropSchema(id, true), "schema should be 
non-existent");
-      Assertions.assertFalse(ops.dropSchema(id, false), "schema should be 
non-existent");
+      ops.dropSchema(id, true);
+      Assertions.assertTrue(fs.exists(schemaPath));
+      Assertions.assertFalse(fs.exists(fs2Path));
+      // The path of external fileset should not be deleted
+      Assertions.assertTrue(fs.exists(fs3Path));
+
+      // Test drop schema with different storage location
+      Path fs4Path = new Path(TEST_ROOT_PATH + "/fs4");
+      createFileset("fs4", name, "comment", Fileset.Type.MANAGED, catalogPath, 
fs4Path.toString());
+      ops.dropSchema(id, true);
+      Assertions.assertFalse(fs.exists(fs4Path));
     }
   }
 
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
index d074709bd..58507ea92 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
@@ -550,8 +550,7 @@ public class HadoopUserAuthenticationIT extends BaseIT {
     Assertions.assertDoesNotThrow(
         () -> 
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME, 
fileset2)));
 
-    Assertions.assertThrows(
-        Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME, 
true));
+    Assertions.assertDoesNotThrow(() -> 
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
     kerberosHiveContainer.executeInContainer(
         "hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" + 
catalogName);
     Assertions.assertDoesNotThrow(() -> 
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
diff --git a/docs/manage-fileset-metadata-using-gravitino.md 
b/docs/manage-fileset-metadata-using-gravitino.md
index c56898ed8..7115e705d 100644
--- a/docs/manage-fileset-metadata-using-gravitino.md
+++ b/docs/manage-fileset-metadata-using-gravitino.md
@@ -275,8 +275,10 @@ Please refer to [Drop a 
schema](./manage-relational-metadata-using-gravitino.md#
 in relational catalog for more details. For a fileset catalog, the schema drop 
operation is the
 same.
 
-Note that the drop operation will also remove all of the filesets as well as 
the managed files
-under this schema path if `cascade` is set to `true`.
+Note that the drop operation will delete all the fileset metadata under this 
schema if `cascade`
+set to `true`. Besides, for `MANAGED` fileset, this drop operation will also 
**remove** all the
+files/directories of this fileset; for `EXTERNAL` fileset, this drop operation 
will only delete
+the metadata of this fileset.
 
 ### List all schemas under a catalog
 

Reply via email to