yuqi1129 commented on code in PR #5521:
URL: https://github.com/apache/gravitino/pull/5521#discussion_r1836073400


##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -581,29 +583,63 @@ public Schema alterSchema(NameIdentifier ident, 
SchemaChange... changes)
   @Override
   public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
     try {
+      Namespace filesetNs =
+          Namespace.of(
+              Stream.concat(Stream.of(ident.namespace().levels()), 
Stream.of(ident.name()))
+                  .toArray(String[]::new));
+      List<FilesetEntity> filesets =
+          store.list(filesetNs, FilesetEntity.class, 
Entity.EntityType.FILESET);
+      if (filesets.size() > 0 && !cascade) {
+        throw new NonEmptySchemaException("Schema %s is not empty", ident);
+      }
+
+      // Delete all the managed filesets no matter whether the storage 
location is under the
+      // schema path or not.
+      // The reason why we delete the managed fileset's storage location one 
by one is because we
+      // may mis-delete the storage location of the external fileset if it 
happens to be under
+      // the schema path.
+      filesets.stream()
+          .filter(f -> f.filesetType() == Fileset.Type.MANAGED)
+          .forEach(
+              f -> {
+                try {
+                  Path filesetPath = new Path(f.storageLocation());
+                  FileSystem fs = getFileSystem(filesetPath, conf);
+                  if (fs.exists(filesetPath)) {
+                    if (!fs.delete(filesetPath, true)) {
+                      LOG.warn("Failed to delete fileset {} location {}", 
f.name(), filesetPath);
+                    }
+                  }
+                } catch (IOException ioe) {
+                  LOG.warn(
+                      "Failed to delete fileset {} location {}",
+                      f.name(),
+                      f.storageLocation(),
+                      ioe);
+                }
+              });
+
       SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, 
SchemaEntity.class);
       Map<String, String> properties =
           
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
 
+      // Delete the schema path if it exists and is empty.
       Path schemaPath = getSchemaPath(ident.name(), properties);
-      // Nothing to delete if the schema path is not set.
-      if (schemaPath == null) {
-        return false;
-      }
-      FileSystem fs = getFileSystem(schemaPath, conf);
-      // Nothing to delete if the schema path does not exist.
-      if (!fs.exists(schemaPath)) {
-        return false;
-      }
-
-      if (fs.listStatus(schemaPath).length > 0 && !cascade) {
-        throw new NonEmptySchemaException(
-            "Schema %s with location %s is not empty", ident, schemaPath);
-      } else {
-        fs.delete(schemaPath, true);
+      if (schemaPath != null) {
+        FileSystem fs = getFileSystem(schemaPath, conf);
+        if (fs.exists(schemaPath)) {
+          FileStatus[] statuses = fs.listStatus(schemaPath);
+          if (statuses.length == 0) {
+            if (fs.delete(schemaPath, true)) {
+              LOG.info("Deleted schema {} location {}", ident, schemaPath);
+            } else {
+              LOG.warn("Failed to delete schema {} location {}", ident, 
schemaPath);

Review Comment:
   Why do we only log warning information if we drop the schema fails? 



##########
catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java:
##########
@@ -581,29 +583,63 @@ public Schema alterSchema(NameIdentifier ident, 
SchemaChange... changes)
   @Override
   public boolean dropSchema(NameIdentifier ident, boolean cascade) throws 
NonEmptySchemaException {
     try {
+      Namespace filesetNs =
+          Namespace.of(
+              Stream.concat(Stream.of(ident.namespace().levels()), 
Stream.of(ident.name()))
+                  .toArray(String[]::new));
+      List<FilesetEntity> filesets =
+          store.list(filesetNs, FilesetEntity.class, 
Entity.EntityType.FILESET);
+      if (filesets.size() > 0 && !cascade) {
+        throw new NonEmptySchemaException("Schema %s is not empty", ident);
+      }
+
+      // Delete all the managed filesets no matter whether the storage 
location is under the
+      // schema path or not.
+      // The reason why we delete the managed fileset's storage location one 
by one is because we
+      // may mis-delete the storage location of the external fileset if it 
happens to be under
+      // the schema path.
+      filesets.stream()
+          .filter(f -> f.filesetType() == Fileset.Type.MANAGED)
+          .forEach(
+              f -> {
+                try {
+                  Path filesetPath = new Path(f.storageLocation());
+                  FileSystem fs = getFileSystem(filesetPath, conf);
+                  if (fs.exists(filesetPath)) {
+                    if (!fs.delete(filesetPath, true)) {
+                      LOG.warn("Failed to delete fileset {} location {}", 
f.name(), filesetPath);
+                    }
+                  }
+                } catch (IOException ioe) {
+                  LOG.warn(

Review Comment:
   I'm afraid there will be some issues as the following:
   - Due to some unknown reasons, a few of the files under the schema can't be 
removed. In this case, should we ignore it?
   - Can we remove files one by one and then drop the schema in a transaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to