This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 1420174bb [#4842] fix(hadoop-catalog): Improve the schema drop logic
for fileset catalog (#5521)
1420174bb is described below
commit 1420174bb8c35913678db4e39809ddac8ef16b85
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Nov 12 10:57:48 2024 +0800
[#4842] fix(hadoop-catalog): Improve the schema drop logic for fileset
catalog (#5521)
### What changes were proposed in this pull request?
This PR changes the schema drop mechanism to delete the fileset path one
by one, instead of delete the schema path. This improvement will:
1. Delete the fileset path that is not under the schema's location.
2. Avoid deleting the external fileset path that happens to under schema
location.
### Why are the changes needed?
To make the drop mechanism correct.
Fix: #4842
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UT to cover the changes.
---
.../catalog/hadoop/HadoopCatalogOperations.java | 78 +++++++++++++++++++---
.../hadoop/TestHadoopCatalogOperations.java | 31 ++++++---
.../test/HadoopUserAuthenticationIT.java | 3 +-
docs/manage-fileset-metadata-using-gravitino.md | 6 +-
4 files changed, 97 insertions(+), 21 deletions(-)
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 30e72c041..21775038b 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -22,6 +22,7 @@ import static
org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -64,8 +65,10 @@ import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -89,7 +92,7 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
private CatalogInfo catalogInfo;
- private final Map<String, FileSystemProvider> fileSystemProvidersMap =
Maps.newHashMap();
+ private Map<String, FileSystemProvider> fileSystemProvidersMap;
private FileSystemProvider defaultFileSystemProvider;
@@ -133,7 +136,10 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config,
HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS);
-
this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));
+ this.fileSystemProvidersMap =
+ ImmutableMap.<String, FileSystemProvider>builder()
+
.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders))
+ .build();
String defaultFileSystemProviderName =
(String)
@@ -581,31 +587,87 @@ public class HadoopCatalogOperations implements
CatalogOperations, SupportsSchem
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws
NonEmptySchemaException {
try {
+ Namespace filesetNs =
+ NamespaceUtil.ofFileset(
+ ident.namespace().level(0), // metalake name
+ ident.namespace().level(1), // catalog name
+ ident.name() // schema name
+ );
+
+ List<FilesetEntity> filesets =
+ store.list(filesetNs, FilesetEntity.class,
Entity.EntityType.FILESET);
+ if (!filesets.isEmpty() && !cascade) {
+ throw new NonEmptySchemaException("Schema %s is not empty", ident);
+ }
+
+ // Delete all the managed filesets no matter whether the storage
location is under the
+ // schema path or not.
+ // The reason why we delete the managed fileset's storage location one
by one is because we
+ // may mis-delete the storage location of the external fileset if it
happens to be under
+ // the schema path.
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ filesets
+ .parallelStream()
+ .filter(f -> f.filesetType() == Fileset.Type.MANAGED)
+ .forEach(
+ f -> {
+ ClassLoader oldCl =
Thread.currentThread().getContextClassLoader();
+ try {
+ // parallelStream uses forkjoin thread pool, which has a
different classloader
+ // than the catalog thread. We need to set the context
classloader to the
+ // catalog's classloader to avoid classloading issues.
+ Thread.currentThread().setContextClassLoader(cl);
+ Path filesetPath = new Path(f.storageLocation());
+ FileSystem fs = getFileSystem(filesetPath, conf);
+ if (fs.exists(filesetPath)) {
+ if (!fs.delete(filesetPath, true)) {
+ LOG.warn("Failed to delete fileset {} location {}",
f.name(), filesetPath);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn(
+ "Failed to delete fileset {} location {}",
+ f.name(),
+ f.storageLocation(),
+ ioe);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldCl);
+ }
+ });
+
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA,
SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
+ // Delete the schema path if it exists and is empty.
Path schemaPath = getSchemaPath(ident.name(), properties);
// Nothing to delete if the schema path is not set.
if (schemaPath == null) {
return false;
}
+
FileSystem fs = getFileSystem(schemaPath, conf);
// Nothing to delete if the schema path does not exist.
if (!fs.exists(schemaPath)) {
return false;
}
- if (fs.listStatus(schemaPath).length > 0 && !cascade) {
- throw new NonEmptySchemaException(
- "Schema %s with location %s is not empty", ident, schemaPath);
- } else {
- fs.delete(schemaPath, true);
+ FileStatus[] statuses = fs.listStatus(schemaPath);
+ if (statuses.length == 0) {
+ if (fs.delete(schemaPath, true)) {
+ LOG.info("Deleted schema {} location {}", ident, schemaPath);
+ } else {
+ LOG.warn("Failed to delete schema {} location {}", ident,
schemaPath);
+ return false;
+ }
}
- LOG.info("Deleted schema {} location {}", ident, schemaPath);
+ LOG.info("Deleted schema {}", ident);
return true;
+ } catch (NoSuchEntityException ne) {
+ LOG.warn("Schema {} does not exist", ident);
+ return false;
} catch (IOException ioe) {
throw new RuntimeException("Failed to delete schema " + ident + "
location", ioe);
}
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 9575a1313..3c1ea4ff0 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -443,23 +443,36 @@ public class TestHadoopCatalogOperations {
Assertions.assertFalse(fs.exists(schemaPath));
// Test drop non-empty schema with cascade = false
- Path subPath = new Path(schemaPath, "test1");
- fs.mkdirs(subPath);
- Assertions.assertTrue(fs.exists(subPath));
+ Fileset fs1 = createFileset("fs1", name, "comment",
Fileset.Type.MANAGED, catalogPath, null);
+ Path fs1Path = new Path(fs1.storageLocation());
Throwable exception1 =
Assertions.assertThrows(NonEmptySchemaException.class, () ->
ops.dropSchema(id, false));
- Assertions.assertEquals(
- "Schema m1.c1.schema20 with location " + schemaPath + " is not
empty",
- exception1.getMessage());
+ Assertions.assertEquals("Schema m1.c1.schema20 is not empty",
exception1.getMessage());
// Test drop non-empty schema with cascade = true
ops.dropSchema(id, true);
Assertions.assertFalse(fs.exists(schemaPath));
+ Assertions.assertFalse(fs.exists(fs1Path));
+
+ // Test drop both managed and external filesets
+ Fileset fs2 = createFileset("fs2", name, "comment",
Fileset.Type.MANAGED, catalogPath, null);
+ Path fs2Path = new Path(fs2.storageLocation());
+
+ Path fs3Path = new Path(schemaPath, "fs3");
+ createFileset("fs3", name, "comment", Fileset.Type.EXTERNAL,
catalogPath, fs3Path.toString());
- // Test drop empty schema
- Assertions.assertFalse(ops.dropSchema(id, true), "schema should be
non-existent");
- Assertions.assertFalse(ops.dropSchema(id, false), "schema should be
non-existent");
+ ops.dropSchema(id, true);
+ Assertions.assertTrue(fs.exists(schemaPath));
+ Assertions.assertFalse(fs.exists(fs2Path));
+ // The path of external fileset should not be deleted
+ Assertions.assertTrue(fs.exists(fs3Path));
+
+ // Test drop schema with different storage location
+ Path fs4Path = new Path(TEST_ROOT_PATH + "/fs4");
+ createFileset("fs4", name, "comment", Fileset.Type.MANAGED, catalogPath,
fs4Path.toString());
+ ops.dropSchema(id, true);
+ Assertions.assertFalse(fs.exists(fs4Path));
}
}
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
index d074709bd..58507ea92 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java
@@ -550,8 +550,7 @@ public class HadoopUserAuthenticationIT extends BaseIT {
Assertions.assertDoesNotThrow(
() ->
catalog.asFilesetCatalog().dropFileset(NameIdentifier.of(SCHEMA_NAME,
fileset2)));
- Assertions.assertThrows(
- Exception.class, () -> catalog.asSchemas().dropSchema(SCHEMA_NAME,
true));
+ Assertions.assertDoesNotThrow(() ->
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
kerberosHiveContainer.executeInContainer(
"hadoop", "fs", "-chown", "-R", "cli_schema", "/user/hadoop/" +
catalogName);
Assertions.assertDoesNotThrow(() ->
catalog.asSchemas().dropSchema(SCHEMA_NAME, true));
diff --git a/docs/manage-fileset-metadata-using-gravitino.md
b/docs/manage-fileset-metadata-using-gravitino.md
index c56898ed8..7115e705d 100644
--- a/docs/manage-fileset-metadata-using-gravitino.md
+++ b/docs/manage-fileset-metadata-using-gravitino.md
@@ -275,8 +275,10 @@ Please refer to [Drop a
schema](./manage-relational-metadata-using-gravitino.md#
in relational catalog for more details. For a fileset catalog, the schema drop
operation is the
same.
-Note that the drop operation will also remove all of the filesets as well as
the managed files
-under this schema path if `cascade` is set to `true`.
+Note that the drop operation will delete all the fileset metadata under this
schema if `cascade`
+set to `true`. Besides, for `MANAGED` fileset, this drop operation will also
**remove** all the
+files/directories of this fileset; for `EXTERNAL` fileset, this drop operation
will only delete
+the metadata of this fileset.
### List all schemas under a catalog