This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new dbe7fa9132 [#6892] feat(core): support fileset multiple locations in
core module (#6922)
dbe7fa9132 is described below
commit dbe7fa913259567fcf6bed87d4c14d5d3addb69c
Author: mchades <[email protected]>
AuthorDate: Tue Apr 15 10:09:49 2025 +0800
[#6892] feat(core): support fileset multiple locations in core module
(#6922)
### What changes were proposed in this pull request?
support fileset multiple locations in the core and catalog module
### Why are the changes needed?
Fix: #6892
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tests added
---
.../catalog/hadoop/HadoopCatalogOperations.java | 518 ++++++++++----
.../gravitino/catalog/hadoop/HadoopFileset.java | 2 +-
.../hadoop/HadoopFilesetPropertiesMetadata.java | 9 +
.../hadoop/SecureHadoopCatalogOperations.java | 26 +-
.../hadoop/TestHadoopCatalogOperations.java | 766 ++++++++++++++++++++-
.../hadoop/integration/test/HadoopCatalogIT.java | 42 +-
.../test/HadoopUserImpersonationIT.java | 9 +-
.../tests/integration/test_fileset_catalog.py | 17 +-
.../gravitino/catalog/EntityCombinedFileset.java | 12 +-
.../catalog/FilesetNormalizeDispatcher.java | 14 +-
.../catalog/FilesetOperationDispatcher.java | 26 +-
.../apache/gravitino/connector/BaseFileset.java | 21 +-
.../gravitino/hook/FilesetHookDispatcher.java | 15 +-
.../gravitino/listener/FilesetEventDispatcher.java | 26 +-
.../listener/api/event/GetFileLocationEvent.java | 31 +
.../api/event/GetFileLocationFailureEvent.java | 31 +
.../api/event/GetFileLocationPreEvent.java | 24 +
.../gravitino/listener/api/info/FilesetInfo.java | 44 +-
.../relational/service/SchemaMetaService.java | 1 +
.../java/org/apache/gravitino/TestFileset.java | 2 +-
.../gravitino/connector/TestCatalogOperations.java | 45 +-
.../listener/api/event/TestFilesetEvent.java | 8 +-
.../gravitino/meta/TestEntityCombinedObject.java | 2 +-
23 files changed, 1459 insertions(+), 232 deletions(-)
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 0cf67f6ce4..74f7a0ebe9 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -21,8 +21,10 @@ package org.apache.gravitino.catalog.hadoop;
import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
import static org.apache.gravitino.file.Fileset.PROPERTY_FILESET_PLACEHOLDER;
import static
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
+import static
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
import static org.apache.gravitino.file.Fileset.PROPERTY_SCHEMA_PLACEHOLDER;
import com.google.common.annotations.VisibleForTesting;
@@ -34,9 +36,11 @@ import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -66,6 +70,7 @@ import
org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
@@ -102,7 +107,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
@VisibleForTesting Configuration hadoopConf;
- @VisibleForTesting Optional<Path> catalogStorageLocation;
+ @VisibleForTesting Map<String, Path> catalogStorageLocations;
private Map<String, String> conf;
@@ -174,12 +179,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.getOrDefault(config,
HadoopCatalogPropertiesMetadata.LOCATION);
checkPlaceholderValue(catalogLocation);
- this.catalogStorageLocation =
- StringUtils.isNotBlank(catalogLocation)
- ? Optional.of(catalogLocation)
- .map(s -> s.endsWith(SLASH) ? s : s + SLASH)
- .map(Path::new)
- : Optional.empty();
+ this.catalogStorageLocations = getAndCheckCatalogStorageLocations(config);
}
@Override
@@ -210,7 +210,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.withName(ident.name())
.withType(filesetEntity.filesetType())
.withComment(filesetEntity.comment())
- .withStorageLocation(filesetEntity.storageLocation())
+ .withStorageLocations(filesetEntity.storageLocations())
.withProperties(filesetEntity.properties())
.withAuditInfo(filesetEntity.auditInfo())
.build();
@@ -223,13 +223,20 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
+ storageLocations.forEach(
+ (name, path) -> {
+ if (StringUtils.isBlank(name)) {
+ throw new IllegalArgumentException("Location name must not be
blank");
+ }
+ });
+
try {
if (store.exists(ident, Entity.EntityType.FILESET)) {
throw new FilesetAlreadyExistsException("Fileset %s already exists",
ident);
@@ -249,46 +256,82 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
}
// For external fileset, the storageLocation must be set.
- if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation))
{
- throw new IllegalArgumentException(
- "Storage location must be set for external fileset " + ident);
+ if (type == Fileset.Type.EXTERNAL) {
+ if (storageLocations.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Storage location must be set for external fileset " + ident);
+ }
+ storageLocations.forEach(
+ (locationName, location) -> {
+ if (StringUtils.isBlank(location)) {
+ throw new IllegalArgumentException(
+ "Storage location must be set for external fileset "
+ + ident
+ + " with location name "
+ + locationName);
+ }
+ });
}
// Either catalog property "location", or schema property "location", or
storageLocation must be
// set for managed fileset.
- Path schemaPath = getSchemaPath(schemaIdent.name(),
schemaEntity.properties());
- if (schemaPath == null && StringUtils.isBlank(storageLocation)) {
+ Map<String, Path> schemaPaths =
+ getAndCheckSchemaPaths(schemaIdent.name(), schemaEntity.properties());
+ if (schemaPaths.isEmpty() && storageLocations.isEmpty()) {
throw new IllegalArgumentException(
"Storage location must be set for fileset "
+ ident
+ " when it's catalog and schema location are not set");
}
- checkPlaceholderValue(storageLocation);
+ storageLocations.forEach((k, location) -> checkPlaceholderValue(location));
- Path filesetPath =
- caculateFilesetPath(
- schemaIdent.name(), ident.name(), storageLocation, schemaPath,
properties);
+ Map<String, Path> filesetPaths =
+ calculateFilesetPaths(
+ schemaIdent.name(), ident.name(), storageLocations, schemaPaths,
properties);
+ properties = setDefaultLocationIfAbsent(properties, filesetPaths);
+ ImmutableMap.Builder<String, Path> filesetPathsBuilder =
ImmutableMap.builder();
try {
// formalize the path to avoid path without scheme, uri, authority, etc.
- FileSystem fs = getFileSystem(filesetPath, conf);
- filesetPath = filesetPath.makeQualified(fs.getUri(),
fs.getWorkingDirectory());
- if (!fs.exists(filesetPath)) {
- if (!fs.mkdirs(filesetPath)) {
- throw new RuntimeException(
- "Failed to create fileset " + ident + " location " +
filesetPath);
- }
+ for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
+ Path formalizePath = formalizePath(entry.getValue(), conf);
+ filesetPathsBuilder.put(entry.getKey(), formalizePath);
- LOG.info("Created fileset {} location {}", ident, filesetPath);
- } else {
- LOG.info("Fileset {} manages the existing location {}", ident,
filesetPath);
+ FileSystem fs = getFileSystem(formalizePath, conf);
+ if (!fs.exists(formalizePath)) {
+ if (!fs.mkdirs(formalizePath)) {
+ throw new RuntimeException(
+ "Failed to create fileset "
+ + ident
+ + " location "
+ + formalizePath
+ + " with location name "
+ + entry.getKey());
+ }
+
+ LOG.info(
+ "Created fileset {} location {} with location name {}",
+ ident,
+ formalizePath,
+ entry.getKey());
+ } else {
+ LOG.info(
+ "Fileset {} manages the existing location {} with location name
{}",
+ ident,
+ formalizePath,
+ entry.getKey());
+ }
}
} catch (IOException ioe) {
- throw new RuntimeException(
- "Failed to create fileset " + ident + " location " + filesetPath,
ioe);
+ throw new RuntimeException("Failed to create fileset " + ident, ioe);
}
+ Map<String, String> formattedStorageLocations =
+ Maps.transformValues(filesetPathsBuilder.build(), Path::toString);
+ validateLocationHierarchy(
+ Maps.transformValues(schemaPaths, Path::toString),
formattedStorageLocations);
+
StringIdentifier stringId = StringIdentifier.fromProperties(properties);
Preconditions.checkArgument(stringId != null, "Property String identifier
should not be null");
@@ -302,7 +345,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
// Store the storageLocation to the store. If the
"storageLocation" is null for managed
// fileset, Gravitino will get and store the location based on the
catalog/schema's
// location and store it to the store.
- .withStorageLocations(ImmutableMap.of(LOCATION_NAME_UNKNOWN,
filesetPath.toString()))
+ .withStorageLocations(formattedStorageLocations)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder()
@@ -321,12 +364,52 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.withName(ident.name())
.withComment(comment)
.withType(type)
- .withStorageLocation(filesetPath.toString())
+ .withStorageLocations(formattedStorageLocations)
.withProperties(filesetEntity.properties())
.withAuditInfo(filesetEntity.auditInfo())
.build();
}
+ private Map<String, String> setDefaultLocationIfAbsent(
+ Map<String, String> properties, Map<String, Path> filesetPaths) {
+ Preconditions.checkArgument(
+ filesetPaths != null && !filesetPaths.isEmpty(), "Fileset paths must
not be null or empty");
+
+ if (filesetPaths.size() == 1) {
+ // If the fileset has only one location, it is the default location.
+ String defaultLocationName = filesetPaths.keySet().iterator().next();
+ if (properties == null || properties.isEmpty()) {
+ return Collections.singletonMap(PROPERTY_DEFAULT_LOCATION_NAME,
defaultLocationName);
+ }
+ if (!properties.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {
+ return ImmutableMap.<String, String>builder()
+ .putAll(properties)
+ .put(PROPERTY_DEFAULT_LOCATION_NAME, defaultLocationName)
+ .build();
+ }
+
+ Preconditions.checkArgument(
+
defaultLocationName.equals(properties.get(PROPERTY_DEFAULT_LOCATION_NAME)),
+ "Default location name must be the same as the fileset location
name");
+ return ImmutableMap.copyOf(properties);
+ }
+
+ // multiple locations
+ Preconditions.checkArgument(
+ properties != null
+ && !properties.isEmpty()
+ && properties.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)
+ &&
filesetPaths.containsKey(properties.get(PROPERTY_DEFAULT_LOCATION_NAME)),
+ "Default location name must be set and must be one of the fileset
locations, "
+ + "location names: "
+ + filesetPaths.keySet()
+ + ", default location name: "
+ + Optional.ofNullable(properties)
+ .map(p -> p.get(PROPERTY_DEFAULT_LOCATION_NAME))
+ .orElse(null));
+ return ImmutableMap.copyOf(properties);
+ }
+
@Override
public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
throws NoSuchFilesetException, IllegalArgumentException {
@@ -350,7 +433,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.withName(updatedFilesetEntity.name())
.withComment(updatedFilesetEntity.comment())
.withType(updatedFilesetEntity.filesetType())
- .withStorageLocation(updatedFilesetEntity.storageLocation())
+ .withStorageLocations(updatedFilesetEntity.storageLocations())
.withProperties(updatedFilesetEntity.properties())
.withAuditInfo(updatedFilesetEntity.auditInfo())
.build();
@@ -371,19 +454,43 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
try {
FilesetEntity filesetEntity =
store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
- Path filesetPath = new Path(filesetEntity.storageLocation());
// For managed fileset, we should delete the related files.
if (filesetEntity.filesetType() == Fileset.Type.MANAGED) {
- FileSystem fs = getFileSystem(filesetPath, conf);
- if (fs.exists(filesetPath)) {
- if (!fs.delete(filesetPath, true)) {
- LOG.warn("Failed to delete fileset {} location {}", ident,
filesetPath);
- return false;
- }
-
- } else {
- LOG.warn("Fileset {} location {} does not exist", ident,
filesetPath);
+ AtomicReference<IOException> exception = new AtomicReference<>();
+ Map<String, Path> storageLocations =
+ Maps.transformValues(filesetEntity.storageLocations(), Path::new);
+ storageLocations.forEach(
+ (locationName, location) -> {
+ try {
+ FileSystem fs = getFileSystem(location, conf);
+ if (fs.exists(location)) {
+ if (!fs.delete(location, true)) {
+ LOG.warn(
+ "Failed to delete fileset {} location {} with location
name {}",
+ ident,
+ location,
+ locationName);
+ }
+ } else {
+ LOG.warn(
+ "Fileset {} location {} with location name {} does not
exist",
+ ident,
+ location,
+ locationName);
+ }
+ } catch (IOException ioe) {
+ LOG.warn(
+ "Failed to delete fileset {} location {} with location
name {}",
+ ident,
+ location,
+ locationName,
+ ioe);
+ exception.set(ioe);
+ }
+ });
+ if (exception.get() != null) {
+ throw exception.get();
}
}
@@ -397,8 +504,8 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
- throws NoSuchFilesetException {
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
@@ -408,10 +515,18 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
}
Fileset fileset = loadFileset(ident);
+ locationName =
+ locationName == null
+ ? fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME)
+ : locationName;
+ if (!fileset.storageLocations().containsKey(locationName)) {
+ throw new NoSuchLocationNameException(
+ "Location name %s does not exist in fileset %s", locationName,
ident);
+ }
- boolean isSingleFile = checkSingleFile(fileset);
+ boolean isSingleFile = checkSingleFile(fileset, locationName);
// if the storage location is a single file, it cannot have sub path to
access.
- if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
+ if (isSingleFile && StringUtils.isNotBlank(processedSubPath)) {
throw new GravitinoRuntimeException(
"Sub path should always be blank, because the fileset only mounts a
single file.");
}
@@ -455,14 +570,11 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
// 1. if the storage location is a single file, we pass the storage
location directly
// 2. if the processed sub path is blank, we pass the storage location
directly
if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
- fileLocation = fileset.storageLocation();
+ fileLocation = fileset.storageLocations().get(locationName);
} else {
// the processed sub path always starts with "/" if it is not blank,
// so we can safely remove the tailing slash if storage location ends
with "/".
- String storageLocation =
- fileset.storageLocation().endsWith(SLASH)
- ? fileset.storageLocation().substring(0,
fileset.storageLocation().length() - 1)
- : fileset.storageLocation();
+ String storageLocation =
removeTrailingSlash(fileset.storageLocations().get(locationName));
fileLocation = String.format("%s%s", storageLocation, processedSubPath);
}
return fileLocation;
@@ -479,26 +591,42 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
throw new RuntimeException("Failed to check if schema " + ident + "
exists", ioe);
}
- Path schemaPath = getSchemaPath(ident.name(), properties);
- if (schemaPath != null && !containsPlaceholder(schemaPath.toString())) {
- try {
- FileSystem fs = getFileSystem(schemaPath, conf);
- if (!fs.exists(schemaPath)) {
- if (!fs.mkdirs(schemaPath)) {
- // Fail the operation when failed to create the schema path.
- throw new RuntimeException(
- "Failed to create schema " + ident + " location " +
schemaPath);
+ Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(),
properties);
+ schemaPaths.forEach(
+ (locationName, schemaPath) -> {
+ if (schemaPath != null &&
!containsPlaceholder(schemaPath.toString())) {
+ try {
+ FileSystem fs = getFileSystem(schemaPath, conf);
+ if (!fs.exists(schemaPath)) {
+ if (!fs.mkdirs(schemaPath)) {
+ // Fail the operation when failed to create the schema path.
+ throw new RuntimeException(
+ "Failed to create schema "
+ + ident
+ + " location: "
+ + schemaPath
+ + " with location name: "
+ + locationName);
+ }
+ LOG.info(
+ "Created schema {} location: {} with location name: {}",
+ ident,
+ schemaPath,
+ locationName);
+ } else {
+ LOG.info(
+ "Schema {} manages the existing location: {} with location
name: {}",
+ ident,
+ schemaPath,
+ locationName);
+ }
+
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Failed to create schema " + ident + " location " +
schemaPath, ioe);
+ }
}
- LOG.info("Created schema {} location {}", ident, schemaPath);
- } else {
- LOG.info("Schema {} manages the existing location {}", ident,
schemaPath);
- }
-
- } catch (IOException ioe) {
- throw new RuntimeException(
- "Failed to create schema " + ident + " location " + schemaPath,
ioe);
- }
- }
+ });
return super.createSchema(ident, comment, properties);
}
@@ -536,9 +664,11 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA,
SchemaEntity.class);
Map<String, String> properties =
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
- Path schemaPath = getSchemaPath(ident.name(), properties);
+ Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(),
properties);
boolean dropped = super.dropSchema(ident, cascade);
+ // If the schema entity is failed to be deleted, we should not delete
the storage location
+ // and return false immediately.
if (!dropped) {
return false;
}
@@ -560,39 +690,73 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
// than the catalog thread. We need to set the context
classloader to the
// catalog's classloader to avoid classloading issues.
Thread.currentThread().setContextClassLoader(cl);
- Path filesetPath = new Path(f.storageLocation());
- FileSystem fs = getFileSystem(filesetPath, conf);
- if (fs.exists(filesetPath)) {
- if (!fs.delete(filesetPath, true)) {
- LOG.warn("Failed to delete fileset {} location {}",
f.name(), filesetPath);
- }
- }
- } catch (IOException ioe) {
- LOG.warn(
- "Failed to delete fileset {} location {}",
- f.name(),
- f.storageLocation(),
- ioe);
+ f.storageLocations()
+ .forEach(
+ (locationName, location) -> {
+ try {
+ Path filesetPath = new Path(location);
+ FileSystem fs = getFileSystem(filesetPath, conf);
+ if (fs.exists(filesetPath)) {
+ if (!fs.delete(filesetPath, true)) {
+ LOG.warn(
+ "Failed to delete fileset {} location:
{} with location name: {}",
+ f.name(),
+ filesetPath,
+ locationName);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn(
+ "Failed to delete fileset {} location: {}
with location name: {}",
+ f.name(),
+ location,
+ locationName,
+ ioe);
+ }
+ });
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
}
});
// Delete the schema path if it exists and is empty.
- if (schemaPath != null) {
- FileSystem fs = getFileSystem(schemaPath, conf);
- if (fs.exists(schemaPath)) {
- FileStatus[] statuses = fs.listStatus(schemaPath);
- if (statuses.length == 0) {
- if (fs.delete(schemaPath, true)) {
- LOG.info("Deleted schema {} location {}", ident, schemaPath);
- } else {
- LOG.warn(
- "Failed to delete schema {} because it has files/folders
under location {}",
- ident,
- schemaPath);
- }
- }
+ if (!schemaPaths.isEmpty()) {
+ AtomicReference<RuntimeException> exception = new AtomicReference<>();
+ schemaPaths.forEach(
+ (locationName, schemaPath) -> {
+ try {
+ FileSystem fs = getFileSystem(schemaPath, conf);
+ if (fs.exists(schemaPath)) {
+ FileStatus[] statuses = fs.listStatus(schemaPath);
+ if (statuses.length == 0) {
+ if (fs.delete(schemaPath, true)) {
+ LOG.info(
+ "Deleted schema {} location {} with location name
{}",
+ ident,
+ schemaPath,
+ locationName);
+ } else {
+ LOG.warn(
+ "Failed to delete schema {} because it has
files/folders under location {} with location name {}",
+ ident,
+ schemaPath,
+ locationName);
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn(
+ "Failed to delete schema {} location {} with location name
{}",
+ ident,
+ schemaPath,
+ locationName,
+ ioe);
+ exception.set(
+ new RuntimeException("Failed to delete schema " + ident +
" location", ioe));
+ }
+ });
+ if (exception.get() != null) {
+ throw exception.get();
}
}
@@ -666,6 +830,108 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.build();
}
+ private void validateLocationHierarchy(
+ Map<String, String> schemaLocations, Map<String, String>
filesetLocations) {
+ if (schemaLocations == null
+ || filesetLocations == null
+ || schemaLocations.isEmpty()
+ || filesetLocations.isEmpty()) {
+ return;
+ }
+
+ filesetLocations.forEach(
+ (filesetLocationName, filesetLocation) ->
+ schemaLocations.forEach(
+ (schemaLocationName, schemaLocation) -> {
+ if (ensureTrailingSlash(schemaLocation)
+ .startsWith(ensureTrailingSlash(filesetLocation))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The fileset location %s with location name %s is
not allowed "
+ + "to be the parent of the schema location %s
with location name %s",
+ filesetLocation,
+ filesetLocationName,
+ schemaLocation,
+ schemaLocationName));
+ }
+ }));
+ }
+
+ private Map<String, Path> getAndCheckCatalogStorageLocations(Map<String,
String> properties) {
+ ImmutableMap.Builder<String, Path> catalogStorageLocations =
ImmutableMap.builder();
+ String unnamedLocation =
+ (String)
+ propertiesMetadata
+ .catalogPropertiesMetadata()
+ .getOrDefault(properties,
HadoopCatalogPropertiesMetadata.LOCATION);
+ if (StringUtils.isNotBlank(unnamedLocation)) {
+ checkPlaceholderValue(unnamedLocation);
+ catalogStorageLocations.put(
+ LOCATION_NAME_UNKNOWN, new
Path(ensureTrailingSlash(unnamedLocation)));
+ }
+
+ properties.forEach(
+ (k, v) -> {
+ if (k.startsWith(PROPERTY_MULTIPLE_LOCATIONS_PREFIX) &&
StringUtils.isNotBlank(v)) {
+ String locationName =
k.substring(PROPERTY_MULTIPLE_LOCATIONS_PREFIX.length());
+ if (StringUtils.isBlank(locationName)) {
+ throw new IllegalArgumentException("Location name must not be
blank");
+ }
+ checkPlaceholderValue(v);
+ catalogStorageLocations.put(locationName, new
Path((ensureTrailingSlash(v))));
+ }
+ });
+ return catalogStorageLocations.build();
+ }
+
+ private Map<String, Path> getAndCheckSchemaPaths(
+ String schemaName, Map<String, String> schemaProps) {
+ Map<String, Path> schemaPaths = new HashMap<>();
+ catalogStorageLocations.forEach(
+ (name, path) -> {
+ if (containsPlaceholder(path.toString())) {
+ schemaPaths.put(name, path);
+ } else {
+ schemaPaths.put(name, new Path(path, schemaName));
+ }
+ });
+
+ String unnamedSchemaLocation =
+ (String)
+ propertiesMetadata
+ .schemaPropertiesMetadata()
+ .getOrDefault(schemaProps,
HadoopSchemaPropertiesMetadata.LOCATION);
+ checkPlaceholderValue(unnamedSchemaLocation);
+ Optional.ofNullable(unnamedSchemaLocation)
+ .map(this::ensureTrailingSlash)
+ .map(Path::new)
+ .ifPresent(p -> schemaPaths.put(LOCATION_NAME_UNKNOWN, p));
+
+ schemaProps.forEach(
+ (k, path) -> {
+ if (k.startsWith(PROPERTY_MULTIPLE_LOCATIONS_PREFIX)) {
+ checkPlaceholderValue(path);
+ String locationName =
k.substring(PROPERTY_MULTIPLE_LOCATIONS_PREFIX.length());
+ if (StringUtils.isBlank(locationName)) {
+ throw new IllegalArgumentException("Location name must not be
blank");
+ }
+ Optional.ofNullable(path)
+ .map(this::ensureTrailingSlash)
+ .map(Path::new)
+ .ifPresent(p -> schemaPaths.put(locationName, p));
+ }
+ });
+ return ImmutableMap.copyOf(schemaPaths);
+ }
+
+ private String ensureTrailingSlash(String path) {
+ return path.endsWith(SLASH) ? path : path + SLASH;
+ }
+
+ private String removeTrailingSlash(String path) {
+ return path.endsWith(SLASH) ? path.substring(0, path.length() - 1) : path;
+ }
+
private FilesetEntity updateFilesetEntity(
NameIdentifier ident, FilesetEntity filesetEntity, FilesetChange...
changes) {
Map<String, String> props =
@@ -700,8 +966,7 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.withId(filesetEntity.id())
.withComment(newComment)
.withFilesetType(filesetEntity.filesetType())
- .withStorageLocations(
- ImmutableMap.of(LOCATION_NAME_UNKNOWN,
filesetEntity.storageLocation()))
+ .withStorageLocations(filesetEntity.storageLocations())
.withProperties(props)
.withAuditInfo(
AuditInfo.builder()
@@ -713,23 +978,6 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
.build();
}
- private Path getSchemaPath(String name, Map<String, String> properties) {
- String schemaLocation =
- (String)
- propertiesMetadata
- .schemaPropertiesMetadata()
- .getOrDefault(properties,
HadoopSchemaPropertiesMetadata.LOCATION);
- checkPlaceholderValue(schemaLocation);
-
- return Optional.ofNullable(schemaLocation)
- .map(s -> s.endsWith(SLASH) ? s : s + SLASH)
- .map(Path::new)
- .orElse(
- catalogStorageLocation
- .map(p -> containsPlaceholder(p.toString()) ? p : new Path(p,
name))
- .orElse(null));
- }
-
/**
* Check whether the placeholder in the location is valid. Throw an
exception if the location
* contains a placeholder with an empty value.
@@ -763,6 +1011,28 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
&& LOCATION_PLACEHOLDER_PATTERN.matcher(location).find();
}
+ private Map<String, Path> calculateFilesetPaths(
+ String schemaName,
+ String filesetName,
+ Map<String, String> storageLocations,
+ Map<String, Path> schemaPaths,
+ Map<String, String> properties) {
+ ImmutableMap.Builder<String, Path> filesetPaths = ImmutableMap.builder();
+ Set<String> locationNames = new HashSet<>(schemaPaths.keySet());
+ locationNames.addAll(storageLocations.keySet());
+
+ locationNames.forEach(
+ locationName -> {
+ String storageLocation = storageLocations.get(locationName);
+ Path schemaPath = schemaPaths.get(locationName);
+ filesetPaths.put(
+ locationName,
+ caculateFilesetPath(
+ schemaName, filesetName, storageLocation, schemaPath,
properties));
+ });
+ return filesetPaths.build();
+ }
+
private Path caculateFilesetPath(
String schemaName,
String filesetName,
@@ -844,9 +1114,9 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}
- private boolean checkSingleFile(Fileset fileset) {
+ private boolean checkSingleFile(Fileset fileset, String locationName) {
try {
- Path locationPath = new Path(fileset.storageLocation());
+ Path locationPath = new
Path(fileset.storageLocations().get(locationName));
return getFileSystem(locationPath,
conf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in
`FileSystem.isFile(Path f)`.
@@ -854,8 +1124,10 @@ public class HadoopCatalogOperations extends
ManagedSchemaOperations
} catch (IOException e) {
throw new GravitinoRuntimeException(
e,
- "Exception occurs when checking whether fileset: %s mounts a single
file",
- fileset.name());
+ "Exception occurs when checking whether fileset: %s "
+ + "mounts a single file with location name: %s",
+ fileset.name(),
+ locationName);
}
}
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
index b292d29f15..6d622cf1d0 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
@@ -31,7 +31,7 @@ public class HadoopFileset extends BaseFileset {
HadoopFileset fileset = new HadoopFileset();
fileset.name = name;
fileset.comment = comment;
- fileset.storageLocation = storageLocation;
+ fileset.storageLocations = storageLocations;
fileset.type = type;
fileset.properties = properties;
fileset.auditInfo = auditInfo;
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 6ff885ac83..1f6f578f76 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog.hadoop;
import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
import static org.apache.gravitino.file.Fileset.PROPERTY_FILESET_PLACEHOLDER;
import static
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
import static org.apache.gravitino.file.Fileset.PROPERTY_SCHEMA_PLACEHOLDER;
@@ -64,6 +65,14 @@ public class HadoopFilesetPropertiesMetadata extends
BasePropertiesMetadata {
null /* default value */,
false /* hidden */,
false /* reserved */))
+ .put(
+ PROPERTY_DEFAULT_LOCATION_NAME,
+ PropertyEntry.stringOptionalPropertyEntry(
+ PROPERTY_DEFAULT_LOCATION_NAME,
+ "The default location name for the fileset",
+ true /* immutable */,
+ null,
+ false /* hidden */))
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES);
diff --git
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index 7ae10805b5..2982080306 100644
---
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -50,6 +50,7 @@ import
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
@@ -105,11 +106,11 @@ public class SecureHadoopCatalogOperations
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
String apiUser = PrincipalUtils.getCurrentUserName();
@@ -120,8 +121,8 @@ public class SecureHadoopCatalogOperations
return userContext.doAs(
() -> {
setUser(apiUser);
- return hadoopCatalogOperations.createFileset(
- ident, comment, type, storageLocation, properties);
+ return hadoopCatalogOperations.createMultipleLocationFileset(
+ ident, comment, type, storageLocations, properties);
},
ident);
}
@@ -233,9 +234,9 @@ public class SecureHadoopCatalogOperations
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
- throws NoSuchFilesetException {
- return hadoopCatalogOperations.getFileLocation(ident, subPath);
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
+ return hadoopCatalogOperations.getFileLocation(ident, subPath,
locationName);
}
@Override
@@ -260,7 +261,16 @@ public class SecureHadoopCatalogOperations
@Override
public List<PathContext> getPathContext(NameIdentifier filesetIdentifier) {
Fileset fileset = loadFileset(filesetIdentifier);
- String path = fileset.storageLocation();
+ Map<String, String> locations = fileset.storageLocations();
+ Preconditions.checkArgument(
+ locations != null && !locations.isEmpty(),
+ "No storage locations found for fileset: " + filesetIdentifier);
+
+ // todo: support multiple storage locations
+ Preconditions.checkArgument(
+ locations.size() == 1, "Only one storage location is supported for
fileset now");
+
+ String path = locations.values().iterator().next();
Preconditions.checkState(
StringUtils.isNotBlank(path), "The location of fileset should not be
empty.");
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index deff740dd2..97c01704ed 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -35,6 +35,10 @@ import static
org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META;
import static
org.apache.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META;
+import static
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.LOCATION;
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
+import static
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
@@ -281,6 +285,17 @@ public class TestHadoopCatalogOperations {
Mockito.anyString(), Mockito.anyString(),
Mockito.eq("s1_" + name));
});
+ multipleLocationsArguments()
+ .forEach(
+ arguments -> {
+ String name = (String) arguments.get()[0];
+ long schemaId = idGenerator.nextId();
+ doReturn(new SchemaIds(1L, 1L, schemaId))
+ .when(spySchemaMetaService)
+ .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
+ Mockito.anyString(), Mockito.anyString(),
Mockito.eq("s1_" + name));
+ });
+
MockedStatic<MetalakeMetaService> metalakeMetaServiceMockedStatic =
Mockito.mockStatic(MetalakeMetaService.class);
MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
@@ -323,21 +338,21 @@ public class TestHadoopCatalogOperations {
String value = conf.get("fs.defaultFS");
Assertions.assertEquals("file:///", value);
- emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION,
"file:///tmp/catalog");
+ emptyProps.put(LOCATION, "file:///tmp/catalog");
ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
- Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
+ Assertions.assertEquals(1, ops.catalogStorageLocations.size());
Path expectedPath = new Path("file:///tmp/catalog");
- Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+ Assertions.assertEquals(expectedPath,
ops.catalogStorageLocations.get(LOCATION_NAME_UNKNOWN));
// test placeholder in location
- emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION,
"file:///tmp/{{catalog}}");
+ emptyProps.put(LOCATION, "file:///tmp/{{catalog}}");
ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
- Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
+ Assertions.assertEquals(1, ops.catalogStorageLocations.size());
expectedPath = new Path("file:///tmp/{{catalog}}");
- Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+ Assertions.assertEquals(expectedPath,
ops.catalogStorageLocations.get(LOCATION_NAME_UNKNOWN));
// test illegal placeholder in location
- emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION,
"file:///tmp/{{}}");
+ emptyProps.put(LOCATION, "file:///tmp/{{}}");
Throwable exception =
Assertions.assertThrows(
IllegalArgumentException.class,
@@ -572,7 +587,7 @@ public class TestHadoopCatalogOperations {
try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
ops.initialize(
- ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION,
catalogPath),
+ ImmutableMap.of(LOCATION, catalogPath),
randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
Schema schema1 = ops.loadSchema(id);
@@ -639,7 +654,7 @@ public class TestHadoopCatalogOperations {
String comment = "comment_s1";
Map<String, String> catalogProps = Maps.newHashMap();
if (catalogPath != null) {
- catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ catalogProps.put(LOCATION, catalogPath);
}
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
@@ -763,7 +778,7 @@ public class TestHadoopCatalogOperations {
String comment = "comment_s24";
Map<String, String> catalogProps = Maps.newHashMap();
if (catalogPath != null) {
- catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ catalogProps.put(LOCATION, catalogPath);
}
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
@@ -972,7 +987,7 @@ public class TestHadoopCatalogOperations {
String location = "hdfs://localhost:9000";
Map<String, String> catalogProperties = Maps.newHashMap();
- catalogProperties.put(HadoopCatalogPropertiesMetadata.LOCATION,
location);
+ catalogProperties.put(LOCATION, location);
ops.initialize(catalogProperties, randomCatalogInfo(),
HADOOP_PROPERTIES_METADATA);
@@ -980,7 +995,7 @@ public class TestHadoopCatalogOperations {
NameIdentifier nameIdentifier = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
Map<String, String> schemaProperties = Maps.newHashMap();
- schemaProperties.put(HadoopCatalogPropertiesMetadata.LOCATION,
"hdfs://localhost:9000/user1");
+ schemaProperties.put(LOCATION, "hdfs://localhost:9000/user1");
StringIdentifier stringId =
StringIdentifier.fromId(idGenerator.nextId());
schemaProperties =
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId,
schemaProperties));
@@ -1106,6 +1121,10 @@ public class TestHadoopCatalogOperations {
Fileset mockFileset = Mockito.mock(Fileset.class);
when(mockFileset.name()).thenReturn(filesetName4);
when(mockFileset.storageLocation()).thenReturn(filesetLocation4);
+ when(mockFileset.storageLocations())
+ .thenReturn(ImmutableMap.of(LOCATION_NAME_UNKNOWN, filesetLocation4));
+ when(mockFileset.properties())
+ .thenReturn(ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME,
LOCATION_NAME_UNKNOWN));
try (HadoopCatalogOperations mockOps =
Mockito.mock(HadoopCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
@@ -1113,6 +1132,7 @@ public class TestHadoopCatalogOperations {
when(mockOps.getConf()).thenReturn(Maps.newHashMap());
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent,
subPath)).thenCallRealMethod();
+ when(mockOps.getFileLocation(filesetIdent, subPath,
null)).thenCallRealMethod();
when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
.thenReturn(FileSystem.getLocal(new Configuration()));
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
@@ -1177,7 +1197,7 @@ public class TestHadoopCatalogOperations {
String comment = "comment_s1";
Map<String, String> catalogProps = Maps.newHashMap();
if (catalogPath != null) {
- catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ catalogProps.put(LOCATION, catalogPath);
}
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
@@ -1221,6 +1241,679 @@ public class TestHadoopCatalogOperations {
}
}
+ @ParameterizedTest
+ @MethodSource("multipleLocationsArguments")
+ public void testMultipleLocations(
+ String name,
+ Fileset.Type type,
+ Map<String, String> catalogPaths,
+ Map<String, String> schemaPaths,
+ Map<String, String> storageLocations,
+ Map<String, String> filesetProps,
+ Map<String, String> expect)
+ throws IOException {
+ String schemaName = "s1_" + name;
+ String comment = "comment_s1";
+
+ NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
schemaName);
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
+ if (!ops.schemaExists(schemaIdent)) {
+ createMultiLocationSchema(schemaName, comment, catalogPaths,
schemaPaths);
+ }
+ Fileset fileset =
+ createMultiLocationFileset(
+ name, schemaName, "comment", type, catalogPaths,
storageLocations, filesetProps);
+
+ Assertions.assertEquals(name, fileset.name());
+ Assertions.assertEquals(type, fileset.type());
+ Assertions.assertEquals("comment", fileset.comment());
+ Assertions.assertEquals(expect, fileset.storageLocations());
+ Assertions.assertEquals(
+ fileset.storageLocation(),
fileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
+
Assertions.assertNotNull(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+ if (filesetProps != null &&
filesetProps.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {
+ Assertions.assertEquals(
+ filesetProps.get(PROPERTY_DEFAULT_LOCATION_NAME),
+ fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+ }
+ if (filesetProps == null ||
!filesetProps.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {}
+
+ // Test load
+ NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
+ Fileset loadedFileset = ops.loadFileset(filesetIdent);
+ Assertions.assertEquals(name, loadedFileset.name());
+ Assertions.assertEquals(type, loadedFileset.type());
+ Assertions.assertEquals("comment", loadedFileset.comment());
+ Assertions.assertEquals(expect, loadedFileset.storageLocations());
+ Assertions.assertEquals(
+ loadedFileset.storageLocation(),
+ loadedFileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
+
+ // Test drop
+ ops.dropFileset(filesetIdent);
+ for (Map.Entry<String, String> location : expect.entrySet()) {
+ Path expectedPath = new Path(location.getValue());
+ FileSystem fs = expectedPath.getFileSystem(new Configuration());
+ if (type == Fileset.Type.MANAGED) {
+ Assertions.assertFalse(fs.exists(expectedPath));
+ } else {
+ Assertions.assertTrue(
+ fs.exists(expectedPath),
+ "location with name "
+ + location.getKey()
+ + " should exist, path: "
+ + location.getValue());
+ }
+ }
+
+ // clean expected path if exist
+ try (FileSystem fs = FileSystem.newInstance(new Configuration())) {
+ for (String location : expect.values()) {
+ fs.delete(new Path(location), true);
+ }
+ }
+
+ // Test drop non-existent fileset
+ Assertions.assertFalse(ops.dropFileset(filesetIdent), "fileset should be
non-existent");
+ }
+ }
+
+ @Test
+ public void testCreateMultipleLocationsWithExceptions() throws IOException {
+ // empty location name in catalog location
+ Map<String, String> illegalLocations =
+ ImmutableMap.of(PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "",
TEST_ROOT_PATH + "/catalog31_1");
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ Exception exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ops.initialize(
+ illegalLocations, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA));
+ Assertions.assertEquals("Location name must not be blank",
exception.getMessage());
+
+ // empty location name in schema location
+ ops.initialize(ImmutableMap.of(), randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
+ exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ createMultiLocationSchema("s1", "comment",
ImmutableMap.of(), illegalLocations));
+ Assertions.assertEquals("Location name must not be blank",
exception.getMessage());
+
+ // empty location name in storage location
+ exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ createMultiLocationFileset(
+ "fileset_test",
+ "s1",
+ null,
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of("", TEST_ROOT_PATH + "/fileset31"),
+ null));
+ Assertions.assertEquals("Location name must not be blank",
exception.getMessage());
+
+ // storage location is parent of schema location
+ Schema multipLocationSchema =
+ createMultiLocationSchema(
+ "s1",
+ "comment",
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1", TEST_ROOT_PATH +
"/s1/a/b/c"));
+ exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ createMultiLocationFileset(
+ "fileset_test",
+ multipLocationSchema.name(),
+ null,
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of(LOCATION_NAME_UNKNOWN, TEST_ROOT_PATH +
"/s1/a"),
+ null));
+ Assertions.assertTrue(
+ exception
+ .getMessage()
+ .contains(
+ "Default location name must be set and must be one of the
fileset locations"),
+ "Exception message: " + exception.getMessage());
+ }
+ }
+
+ private static Stream<Arguments> multipleLocationsArguments() {
+ return Stream.of(
+ // Honor the catalog location
+ Arguments.of(
+ "fileset51",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/catalog31_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog31_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog31_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/catalog31_1/s1_fileset51/fileset51",
+ "v1",
+ TEST_ROOT_PATH + "/catalog31_1/s1_fileset51/fileset51",
+ "v2",
+ TEST_ROOT_PATH + "/catalog31_2/s1_fileset51/fileset51")),
+ Arguments.of(
+ // honor the schema location
+ "fileset52",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/s1_fileset52_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/s1_fileset52_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset52_2/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v2"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/s1_fileset52_1/fileset52",
+ "v1",
+ TEST_ROOT_PATH + "/s1_fileset52_1/fileset52",
+ "v2",
+ TEST_ROOT_PATH + "/s1_fileset52_2/fileset52")),
+ Arguments.of(
+ // honor the schema location
+ "fileset53",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/catalog32_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog32_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog32_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/s1_fileset53_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/s1_fileset53_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset53_2/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "unknown"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/s1_fileset53_1/fileset53",
+ "v1",
+ TEST_ROOT_PATH + "/s1_fileset53_1/fileset53",
+ "v2",
+ TEST_ROOT_PATH + "/s1_fileset53_2/fileset53")),
+ Arguments.of(
+ // honor the storage location
+ "fileset54",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/catalog33_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog33_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog33_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/s1_fileset54_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/s1_fileset54_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset54_2/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset54_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset54_1",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset54_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset54_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset54_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset55",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset55_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset55_1",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset55_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset55_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset55_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset56",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/catalog34_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog34_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset56_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset56_1",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset56_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset56_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset56_2")),
+ Arguments.of(
+ // honor partial catalog/schema/fileset locations
+ "fileset510",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog34_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog34_2",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v3",
+ TEST_ROOT_PATH + "/catalog34_3"),
+ ImmutableMap.of(
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset510_2",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v3",
+ TEST_ROOT_PATH + "/s1_{{fileset}}_3",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v4",
+ TEST_ROOT_PATH + "/s1_fileset510_4"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset510",
+ "v3",
+ TEST_ROOT_PATH + "/fileset510_3",
+ "v4",
+ TEST_ROOT_PATH + "/fileset510_4",
+ "v5",
+ TEST_ROOT_PATH + "/{{fileset}}_5"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v5"),
+ ImmutableMap.<String, String>builder()
+ .put(LOCATION_NAME_UNKNOWN, TEST_ROOT_PATH + "/fileset510")
+ .put("v1", TEST_ROOT_PATH +
"/catalog34_1/s1_fileset510/fileset510")
+ .put("v2", TEST_ROOT_PATH + "/s1_fileset510_2/fileset510")
+ .put("v3", TEST_ROOT_PATH + "/fileset510_3")
+ .put("v4", TEST_ROOT_PATH + "/fileset510_4")
+ .put("v5", TEST_ROOT_PATH + "/fileset510_5")
+ .build()),
+ Arguments.of(
+ // test without unnamed storage location
+ "fileset511",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog34_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ "v1", TEST_ROOT_PATH + "/fileset511_1", "v2", TEST_ROOT_PATH +
"/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ "v1", TEST_ROOT_PATH + "/fileset511_1", "v2", TEST_ROOT_PATH +
"/fileset511_2")),
+ Arguments.of(
+ // test single location without unnamed storage location
+ "fileset512",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of("v1", TEST_ROOT_PATH + "/{{fileset}}_2"),
+ null,
+ ImmutableMap.of("v1", TEST_ROOT_PATH + "/fileset512_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset57",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/catalog35_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/catalog35_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/catalog35_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/s1_fileset57_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/s1_fileset57_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset57_2/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset57_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset57_1",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset57_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset57_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset57_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset58",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION,
+ TEST_ROOT_PATH + "/s1_fileset58_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ TEST_ROOT_PATH + "/s1_fileset58_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ TEST_ROOT_PATH + "/s1_fileset58_2/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset58_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset58_1",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset58_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset58_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset58_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset59",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset59",
+ "v1",
+ TEST_ROOT_PATH + "/fileset59",
+ "v2",
+ TEST_ROOT_PATH + "/{{fileset}}"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset59",
+ "v1",
+ TEST_ROOT_PATH + "/fileset59",
+ "v2",
+ TEST_ROOT_PATH + "/fileset59")),
+ // Honor the catalog location
+ Arguments.of(
+ "fileset501",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog301_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog301_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH +
"/catalog301_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/catalog301_1/s1_fileset501/fileset501",
+ "v1",
+ TEST_ROOT_PATH + "/catalog301_1/s1_fileset501/fileset501",
+ "v2",
+ TEST_ROOT_PATH + "/catalog301_2/s1_fileset501/fileset501")),
+ Arguments.of(
+ // honor the schema location
+ "fileset502",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_2/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/s1_fileset502_1/fileset502",
+ "v1",
+ TEST_ROOT_PATH + "/s1_fileset502_1/fileset502",
+ "v2",
+ TEST_ROOT_PATH + "/s1_fileset502_2/fileset502")),
+ Arguments.of(
+ // honor the schema location
+ "fileset503",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog302_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog302_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH +
"/catalog302_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_2/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/s1_fileset503_1/fileset503",
+ "v1",
+ TEST_ROOT_PATH + "/s1_fileset503_1/fileset503",
+ "v2",
+ TEST_ROOT_PATH + "/s1_fileset503_2/fileset503")),
+ Arguments.of(
+ // honor the storage location
+ "fileset504",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog303_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog303_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH +
"/catalog303_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_2/{{fileset}}"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset504_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset504_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset504_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset504_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset504_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset505",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset505_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset505_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset505_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset505_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset505_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset506",
+ Fileset.Type.MANAGED,
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog304_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog304_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH +
"/catalog304_2/{{schema}}/{{fileset}}"),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset506_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset506_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset506_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset506_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset506_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset507",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_2"),
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_2"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset507_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset507_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset507_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset507_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset507_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset508",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION,
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_1",
+ PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_2"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset508_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset508_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset508_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset508_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset508_2")),
+ Arguments.of(
+ // honor the storage location
+ "fileset509",
+ Fileset.Type.EXTERNAL,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset509_1",
+ "v1",
+ UNFORMALIZED_TEST_ROOT_PATH + "/fileset509_1",
+ "v2",
+ UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+ ImmutableMap.of(
+ LOCATION_NAME_UNKNOWN,
+ TEST_ROOT_PATH + "/fileset509_1",
+ "v1",
+ TEST_ROOT_PATH + "/fileset509_1",
+ "v2",
+ TEST_ROOT_PATH + "/fileset509_2")));
+ }
+
private static Stream<Arguments> locationWithPlaceholdersArguments() {
return Stream.of(
// placeholders in catalog location
@@ -1687,7 +2380,7 @@ public class TestHadoopCatalogOperations {
throws IOException {
Map<String, String> props = Maps.newHashMap();
if (catalogPath != null) {
- props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ props.put(LOCATION, catalogPath);
}
try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
@@ -1706,6 +2399,47 @@ public class TestHadoopCatalogOperations {
}
}
+ private Schema createMultiLocationSchema(
+ String name,
+ String comment,
+ Map<String, String> catalogPaths,
+ Map<String, String> schemaPaths)
+ throws IOException {
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
+
+ NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1",
name);
+ Map<String, String> schemaProps = Maps.newHashMap();
+ StringIdentifier stringId =
StringIdentifier.fromId(idGenerator.nextId());
+ schemaProps =
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, schemaProps));
+ schemaProps.putAll(schemaPaths);
+
+ return ops.createSchema(schemaIdent, comment, schemaProps);
+ }
+ }
+
+ private Fileset createMultiLocationFileset(
+ String name,
+ String schemaName,
+ String comment,
+ Fileset.Type type,
+ Map<String, String> catalogPaths,
+ Map<String, String> storageLocations,
+ Map<String, String> filesetProps)
+ throws IOException {
+ try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
+ ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"),
HADOOP_PROPERTIES_METADATA);
+
+ NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName,
name);
+ StringIdentifier stringId =
StringIdentifier.fromId(idGenerator.nextId());
+ Map<String, String> newFilesetProps =
+ Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId,
filesetProps));
+
+ return ops.createMultipleLocationFileset(
+ filesetIdent, comment, type, storageLocations, newFilesetProps);
+ }
+ }
+
private Fileset createFileset(
String name,
String schemaName,
@@ -1716,7 +2450,7 @@ public class TestHadoopCatalogOperations {
throws IOException {
Map<String, String> props = Maps.newHashMap();
if (catalogPath != null) {
- props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ props.put(LOCATION, catalogPath);
}
try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
@@ -1742,7 +2476,7 @@ public class TestHadoopCatalogOperations {
throws IOException {
Map<String, String> props = Maps.newHashMap();
if (catalogPath != null) {
- props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+ props.put(LOCATION, catalogPath);
}
try (SecureHadoopCatalogOperations ops = new
SecureHadoopCatalogOperations(store)) {
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 1f358813d9..8a45b5c8d5 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.catalog.hadoop.integration.test;
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
import static
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
import static org.apache.gravitino.file.Fileset.Type.MANAGED;
@@ -202,8 +204,10 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("comment", fileset.comment());
Assertions.assertEquals(MANAGED, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
- Assertions.assertEquals(1, fileset.properties().size());
+ Assertions.assertEquals(2, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
// test create a fileset that already exist
Assertions.assertThrows(
@@ -224,7 +228,9 @@ public class HadoopCatalogIT extends BaseIT {
storageLocation(filesetName2),
fileset2.storageLocation(),
"storage location should be created");
- Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(),
"properties should be empty");
+ Assertions.assertEquals(
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, LOCATION_NAME_UNKNOWN),
+ fileset2.properties());
// create fileset with placeholder in storage location
String filesetName4 = "test_create_fileset_with_placeholder";
@@ -239,7 +245,9 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("comment", fileset4.comment());
Assertions.assertEquals(MANAGED, fileset4.type());
Assertions.assertEquals(expectedStorageLocation4,
fileset4.storageLocation());
- Assertions.assertEquals(0, fileset4.properties().size(), "properties
should be empty");
+ Assertions.assertEquals(1, fileset4.properties().size(), "properties
should be empty");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
fileset4.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
// create fileset with null fileset name
Assertions.assertThrows(
@@ -303,10 +311,12 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("这是中文comment", fileset.comment());
Assertions.assertEquals(MANAGED, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
- Assertions.assertEquals(3, fileset.properties().size());
+ Assertions.assertEquals(4, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertEquals("中文测试test", fileset.properties().get("test"));
Assertions.assertEquals("test1", fileset.properties().get("中文key"));
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
@@ -328,10 +338,12 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("comment", fileset.comment());
Assertions.assertEquals(Fileset.Type.EXTERNAL, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
- Assertions.assertEquals(1, fileset.properties().size());
+ Assertions.assertEquals(2, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertTrue(
fileSystem.exists(new Path(storageLocation)), "storage location should
be created");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
// create fileset with storage location that not exist
String filesetName2 = "test_external_fileset_no_exist";
@@ -538,9 +550,11 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be
change");
Assertions.assertEquals(
storageLocation, newFileset.storageLocation(), "storage location
should not be change");
- Assertions.assertEquals(1, newFileset.properties().size(), "properties
should not be change");
+ Assertions.assertEquals(2, newFileset.properties().size(), "properties
should not be change");
Assertions.assertEquals(
"v1", newFileset.properties().get("k1"), "properties should not be
change");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
@@ -568,9 +582,11 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be
change");
Assertions.assertEquals(
storageLocation, newFileset.storageLocation(), "storage location
should not be change");
- Assertions.assertEquals(1, newFileset.properties().size(), "properties
should not be change");
+ Assertions.assertEquals(2, newFileset.properties().size(), "properties
should not be change");
Assertions.assertEquals(
"v1", newFileset.properties().get("k1"), "properties should not be
change");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
@@ -596,9 +612,11 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be
change");
Assertions.assertEquals(
storageLocation, newFileset.storageLocation(), "storage location
should not be change");
- Assertions.assertEquals(1, newFileset.properties().size(), "properties
should not be change");
+ Assertions.assertEquals(2, newFileset.properties().size(), "properties
should not be change");
Assertions.assertEquals(
"v2", newFileset.properties().get("k1"), "properties should be
updated");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
@@ -624,7 +642,9 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be
change");
Assertions.assertEquals(
storageLocation, newFileset.storageLocation(), "storage location
should not be change");
- Assertions.assertEquals(0, newFileset.properties().size(), "properties
should be removed");
+ Assertions.assertEquals(1, newFileset.properties().size(), "properties
should be removed");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
@@ -650,9 +670,11 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be
changed");
Assertions.assertEquals(
storageLocation, newFileset.storageLocation(), "storage location
should not be changed");
- Assertions.assertEquals(1, newFileset.properties().size(), "properties
should not be changed");
+ Assertions.assertEquals(2, newFileset.properties().size(), "properties
should not be changed");
Assertions.assertEquals(
"v1", newFileset.properties().get("k1"), "properties should not be
changed");
+ Assertions.assertEquals(
+ LOCATION_NAME_UNKNOWN,
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
}
@Test
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
index e967a1c6af..5da898682a 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
@@ -367,7 +367,10 @@ public class HadoopUserImpersonationIT extends BaseIT {
Assertions.assertEquals("comment", fileset.comment());
Assertions.assertEquals(Fileset.Type.MANAGED, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
- Assertions.assertEquals(1, fileset.properties().size());
+ Assertions.assertEquals(2, fileset.properties().size());
+ Assertions.assertTrue(
+
fileset.properties().containsKey(Fileset.PROPERTY_DEFAULT_LOCATION_NAME),
+ "properties should contain default location name");
Assertions.assertEquals("v1", fileset.properties().get("k1"));
// test create a fileset that already exist
@@ -394,7 +397,9 @@ public class HadoopUserImpersonationIT extends BaseIT {
storageLocation(filesetName2),
fileset2.storageLocation(),
"storage location should be created");
- Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(),
"properties should be empty");
+ Assertions.assertTrue(
+
fileset2.properties().containsKey(Fileset.PROPERTY_DEFAULT_LOCATION_NAME),
+ "properties should contain default location name");
// create fileset with null fileset name
Assertions.assertThrows(
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py
b/clients/client-python/tests/integration/test_fileset_catalog.py
index ac3d0a8216..98a31fa0a2 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -180,7 +180,14 @@ class TestFilesetCatalog(IntegrationTestEnv):
self.assertIsNotNone(fileset)
self.assertEqual(fileset.type(), Fileset.Type.MANAGED)
self.assertEqual(fileset.comment(), self.fileset_comment)
- self.assertEqual(fileset.properties(), self.fileset_properties)
+ self.assertEqual(
+ fileset.properties(),
+ {
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME:
Fileset.LOCATION_NAME_UNKNOWN,
+ **self.fileset_properties,
+ },
+ )
+ self.assertEqual(fileset.storage_location(),
f"file:{self.fileset_location}")
def test_drop_fileset(self):
self.create_fileset()
@@ -207,7 +214,13 @@ class TestFilesetCatalog(IntegrationTestEnv):
self.assertIsNotNone(fileset)
self.assertEqual(fileset.name(), self.fileset_name)
self.assertEqual(fileset.comment(), self.fileset_comment)
- self.assertEqual(fileset.properties(), self.fileset_properties)
+ self.assertEqual(
+ fileset.properties(),
+ {
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME:
Fileset.LOCATION_NAME_UNKNOWN,
+ **self.fileset_properties,
+ },
+ )
self.assertEqual(fileset.audit_info().creator(), "anonymous")
def test_failed_load_fileset(self):
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
index d9521ee0b6..0964a0781f 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
@@ -40,14 +40,6 @@ public final class EntityCombinedFileset implements Fileset {
this.filesetEntity = filesetEntity;
}
- public FilesetEntity filesetEntity() {
- return filesetEntity;
- }
-
- public Fileset fileset() {
- return fileset;
- }
-
public static EntityCombinedFileset of(Fileset fileset, FilesetEntity
filesetEntity) {
return new EntityCombinedFileset(fileset, filesetEntity);
}
@@ -77,8 +69,8 @@ public final class EntityCombinedFileset implements Fileset {
}
@Override
- public String storageLocation() {
- return fileset.storageLocation();
+ public Map<String, String> storageLocations() {
+ return fileset.storageLocations();
}
@Override
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index e1c20de702..9a14004050 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -29,6 +29,7 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
@@ -66,15 +67,15 @@ public class FilesetNormalizeDispatcher implements
FilesetDispatcher {
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
- return dispatcher.createFileset(
- normalizeNameIdentifier(ident), comment, type, storageLocation,
properties);
+ return dispatcher.createMultipleLocationFileset(
+ normalizeNameIdentifier(ident), comment, type, storageLocations,
properties);
}
@Override
@@ -95,10 +96,11 @@ public class FilesetNormalizeDispatcher implements
FilesetDispatcher {
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath) {
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath);
+ return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath,
locationName);
}
private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index ff1e8b6adc..5a2eb6a997 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -30,6 +30,7 @@ import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyEntityException;
import org.apache.gravitino.file.Fileset;
@@ -101,26 +102,27 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
/**
* Create a fileset metadata in the catalog.
*
- * <p>If the type of the fileset object is "MANAGED", the underlying
storageLocation can be null,
- * and Gravitino will manage the storage location based on the location of
the schema.
+ * <p>If the type of the fileset object is "MANAGED", the underlying
storageLocations can be
+ * empty, and Gravitino will manage the storage location based on the
locations of the schema.
*
- * <p>If the type of the fileset object is "EXTERNAL", the underlying
storageLocation must be set.
+ * <p>If the type of the fileset object is "EXTERNAL", the underlying
storageLocations must be
+ * set.
*
* @param ident A fileset identifier.
* @param comment The comment of the fileset.
* @param type The type of the fileset.
- * @param storageLocation The storage location of the fileset.
+ * @param storageLocations The location names and corresponding storage
locations of the fileset.
* @param properties The properties of the fileset.
* @return The created fileset metadata
* @throws NoSuchSchemaException If the schema does not exist.
* @throws FilesetAlreadyExistsException If the fileset already exists.
*/
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
@@ -148,8 +150,8 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
c ->
c.doWithFilesetOps(
f ->
- f.createFileset(
- ident, comment, type, storageLocation,
updatedProperties)),
+ f.createMultipleLocationFileset(
+ ident, comment, type, storageLocations,
updatedProperties)),
NoSuchSchemaException.class,
FilesetAlreadyExistsException.class));
return EntityCombinedFileset.of(createdFileset)
@@ -228,15 +230,17 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
/**
* Get the actual location of a file or directory based on the storage
location of Fileset and the
- * sub path.
+ * sub path by the location name.
*
* @param ident A fileset identifier.
* @param subPath The sub path to the file or directory.
+ * @param locationName The location name.
* @return The actual location of the file or directory.
* @throws NoSuchFilesetException If the fileset does not exist.
+ * @throws NoSuchLocationNameException If the location name does not exist.
*/
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
throws NoSuchFilesetException {
return TreeLockUtils.doWithTreeLock(
ident,
@@ -244,7 +248,7 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
() ->
doWithCatalog(
getCatalogIdentifier(ident),
- c -> c.doWithFilesetOps(f -> f.getFileLocation(ident,
subPath)),
+ c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath,
locationName)),
NonEmptyEntityException.class));
}
}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
b/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
index 7880ac4c2c..96ff2ce6e7 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
@@ -18,6 +18,7 @@
*/
package org.apache.gravitino.connector;
+import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.gravitino.annotation.Evolving;
@@ -38,7 +39,7 @@ public abstract class BaseFileset implements Fileset {
protected Type type;
- protected String storageLocation;
+ protected Map<String, String> storageLocations;
@Nullable protected Map<String, String> properties;
@@ -63,10 +64,9 @@ public abstract class BaseFileset implements Fileset {
return type;
}
- /** @return The storage location string of the fileset. */
- @Override
- public String storageLocation() {
- return storageLocation;
+ /** @return The storage locations of the fileset. */
+ public Map<String, String> storageLocations() {
+ return storageLocations;
}
/** @return The audit information for the fileset. */
@@ -90,7 +90,7 @@ public abstract class BaseFileset implements Fileset {
SELF withType(Type type);
- SELF withStorageLocation(String storageLocation);
+ SELF withStorageLocations(Map<String, String> storageLocations);
SELF withProperties(Map<String, String> properties);
@@ -113,6 +113,7 @@ public abstract class BaseFileset implements Fileset {
protected String comment;
protected Type type;
protected String storageLocation;
+ protected Map<String, String> storageLocations = new HashMap<>();
protected Map<String, String> properties;
protected AuditInfo auditInfo;
@@ -153,14 +154,14 @@ public abstract class BaseFileset implements Fileset {
}
/**
- * Sets the storage location of the fileset.
+ * Sets the storage locations of the fileset.
*
- * @param storageLocation The storage location of the fileset.
+ * @param storageLocations The storage locations of the fileset.
* @return The builder instance.
*/
@Override
- public SELF withStorageLocation(String storageLocation) {
- this.storageLocation = storageLocation;
+ public SELF withStorageLocations(Map<String, String> storageLocations) {
+ this.storageLocations.putAll(storageLocations);
return self();
}
diff --git
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index a1e19f9cfa..a6acbff441 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -30,6 +30,7 @@ import org.apache.gravitino.authorization.OwnerManager;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
@@ -59,18 +60,20 @@ public class FilesetHookDispatcher implements
FilesetDispatcher {
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
// Check whether the current user exists or not
AuthorizationUtils.checkCurrentUser(
ident.namespace().level(0), PrincipalUtils.getCurrentUserName());
- Fileset fileset = dispatcher.createFileset(ident, comment, type,
storageLocation, properties);
+ Fileset fileset =
+ dispatcher.createMultipleLocationFileset(
+ ident, comment, type, storageLocations, properties);
// Set the creator as the owner of the fileset.
OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
@@ -118,8 +121,8 @@ public class FilesetHookDispatcher implements
FilesetDispatcher {
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
- throws NoSuchFilesetException {
- return dispatcher.getFileLocation(ident, subPath);
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
+ return dispatcher.getFileLocation(ident, subPath, locationName);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
index 79274d57b2..cb1c2ad1d2 100644
---
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
@@ -97,19 +98,21 @@ public class FilesetEventDispatcher implements
FilesetDispatcher {
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
FilesetInfo createFileRequest =
- new FilesetInfo(ident.name(), comment, type, storageLocation,
properties, null);
+ new FilesetInfo(ident.name(), comment, type, storageLocations,
properties, null);
eventBus.dispatchEvent(
new CreateFilesetPreEvent(PrincipalUtils.getCurrentUserName(), ident,
createFileRequest));
try {
- Fileset fileset = dispatcher.createFileset(ident, comment, type,
storageLocation, properties);
+ Fileset fileset =
+ dispatcher.createMultipleLocationFileset(
+ ident, comment, type, storageLocations, properties);
eventBus.dispatchEvent(
new CreateFilesetEvent(
PrincipalUtils.getCurrentUserName(), ident, new
FilesetInfo(fileset)));
@@ -120,7 +123,7 @@ public class FilesetEventDispatcher implements
FilesetDispatcher {
PrincipalUtils.getCurrentUserName(),
ident,
e,
- new FilesetInfo(ident.name(), comment, type, storageLocation,
properties, null)));
+ new FilesetInfo(ident.name(), comment, type, storageLocations,
properties, null)));
throw e;
}
}
@@ -159,12 +162,13 @@ public class FilesetEventDispatcher implements
FilesetDispatcher {
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
- throws NoSuchFilesetException {
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
eventBus.dispatchEvent(
- new GetFileLocationPreEvent(PrincipalUtils.getCurrentUserName(),
ident, subPath));
+ new GetFileLocationPreEvent(
+ PrincipalUtils.getCurrentUserName(), ident, subPath,
locationName));
try {
- String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
+ String actualFileLocation = dispatcher.getFileLocation(ident, subPath,
locationName);
// get the audit info from the thread local context
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
CallerContext callerContext = CallerContext.CallerContextHolder.get();
@@ -177,11 +181,13 @@ public class FilesetEventDispatcher implements
FilesetDispatcher {
ident,
actualFileLocation,
subPath,
+ locationName,
builder.build()));
return actualFileLocation;
} catch (Exception e) {
eventBus.dispatchEvent(
- new GetFileLocationFailureEvent(PrincipalUtils.getCurrentUserName(),
ident, subPath, e));
+ new GetFileLocationFailureEvent(
+ PrincipalUtils.getCurrentUserName(), ident, subPath,
locationName, e));
throw e;
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
index 54d34aa6ec..72aff8498f 100644
---
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
@@ -29,6 +29,7 @@ public final class GetFileLocationEvent extends FilesetEvent {
private final String actualFileLocation;
private final String subPath;
private final Map<String, String> context;
+ private final String locationName;
/**
* Constructs a new {@code GetFileLocationEvent}, recording the attempt to
get a file location.
@@ -45,8 +46,29 @@ public final class GetFileLocationEvent extends FilesetEvent
{
String actualFileLocation,
String subPath,
Map<String, String> context) {
+ this(user, identifier, actualFileLocation, subPath, null, context);
+ }
+
+ /**
+ * Constructs a new {@code GetFileLocationEvent}, recording the attempt to
get a file location.
+ *
+ * @param user The user who initiated the get file location.
+ * @param identifier The identifier of the file location that was attempted
to be got.
+ * @param actualFileLocation The actual file location which want to get.
+ * @param subPath The accessing sub path of the get file location operation.
+ * @param locationName The location name of the file location.
+ * @param context The audit context, this param can be null.
+ */
+ public GetFileLocationEvent(
+ String user,
+ NameIdentifier identifier,
+ String actualFileLocation,
+ String subPath,
+ String locationName,
+ Map<String, String> context) {
super(user, identifier);
this.actualFileLocation = actualFileLocation;
+ this.locationName = locationName;
this.subPath = subPath;
this.context = context;
}
@@ -78,6 +100,15 @@ public final class GetFileLocationEvent extends
FilesetEvent {
return context;
}
+ /**
+ * Get the location name of the file location.
+ *
+ * @return The location name.
+ */
+ public String locationName() {
+ return locationName;
+ }
+
/**
* Returns the type of operation.
*
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
index 7fd2b48666..a4b8e5db5e 100644
---
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
@@ -29,6 +29,7 @@ import org.apache.gravitino.annotation.DeveloperApi;
@DeveloperApi
public final class GetFileLocationFailureEvent extends FilesetFailureEvent {
private final String subPath;
+ private final String locationName;
/**
* Constructs a new {@code GetFileLocationFailureEvent}.
@@ -42,8 +43,29 @@ public final class GetFileLocationFailureEvent extends
FilesetFailureEvent {
*/
public GetFileLocationFailureEvent(
String user, NameIdentifier identifier, String subPath, Exception
exception) {
+ this(user, identifier, subPath, null, exception);
+ }
+
+ /**
+ * Constructs a new {@code GetFileLocationFailureEvent}.
+ *
+ * @param user The user who initiated the get a file location.
+ * @param identifier The identifier of the file location that was attempted
to be got.
+ * @param subPath The sub path of the actual file location which want to get.
+ * @param locationName The name of the location.
+ * @param exception The exception that was thrown during the get a file
location. This exception
+ * is key to diagnosing the failure, providing insights into what went
wrong during the
+ * operation.
+ */
+ public GetFileLocationFailureEvent(
+ String user,
+ NameIdentifier identifier,
+ String subPath,
+ String locationName,
+ Exception exception) {
super(user, identifier, exception);
this.subPath = subPath;
+ this.locationName = locationName;
}
/**
@@ -55,6 +77,15 @@ public final class GetFileLocationFailureEvent extends
FilesetFailureEvent {
return subPath;
}
+ /**
+ * Get the audit context map of the get file location operation.
+ *
+ * @return The audit context map.
+ */
+ public String locationName() {
+ return locationName;
+ }
+
/**
* Returns the type of operation.
*
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
index 6250234ba3..b9d7454958 100644
---
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
@@ -26,6 +26,7 @@ import org.apache.gravitino.annotation.DeveloperApi;
@DeveloperApi
public final class GetFileLocationPreEvent extends FilesetPreEvent {
private final String subPath;
+ private final String locationName;
/**
* Constructs a new {@code GetFileLocationPreEvent}, recording the intent to
get a file location.
@@ -35,8 +36,22 @@ public final class GetFileLocationPreEvent extends
FilesetPreEvent {
* @param subPath The accessing sub path of the get file location operation.
*/
public GetFileLocationPreEvent(String user, NameIdentifier identifier,
String subPath) {
+ this(user, identifier, subPath, null);
+ }
+
+ /**
+ * Constructs a new {@code GetFileLocationPreEvent}, recording the intent to
get a file location.
+ *
+ * @param user The user who initiated the get file location operation.
+ * @param identifier The identifier of the file location to be accessed.
+ * @param subPath The accessing sub path of the get file location operation.
+ * @param locationName The name of the location to be accessed.
+ */
+ public GetFileLocationPreEvent(
+ String user, NameIdentifier identifier, String subPath, String
locationName) {
super(user, identifier);
this.subPath = subPath;
+ this.locationName = locationName;
}
/**
@@ -48,6 +63,15 @@ public final class GetFileLocationPreEvent extends
FilesetPreEvent {
return subPath;
}
+ /**
+ * Get the name of the location to be accessed.
+ *
+ * @return The name of the location.
+ */
+ public String locationName() {
+ return locationName;
+ }
+
/**
* Returns the type of operation.
*
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
b/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
index 30c5c9566c..d6897bc7ec 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.listener.api.info;
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -32,7 +34,7 @@ public final class FilesetInfo {
private final String name;
@Nullable private final String comment;
private final Fileset.Type type;
- private final String storageLocation;
+ private final Map<String, String> storageLocations;
private final Map<String, String> properties;
@Nullable private final Audit audit;
@@ -46,7 +48,7 @@ public final class FilesetInfo {
fileset.name(),
fileset.comment(),
fileset.type(),
- fileset.storageLocation(),
+ fileset.storageLocations(),
fileset.properties(),
fileset.auditInfo());
}
@@ -71,7 +73,36 @@ public final class FilesetInfo {
this.name = name;
this.comment = comment;
this.type = type;
- this.storageLocation = storageLocation;
+ this.storageLocations =
+ storageLocation == null
+ ? ImmutableMap.of()
+ : ImmutableMap.of(LOCATION_NAME_UNKNOWN, storageLocation);
+ this.properties = properties == null ? ImmutableMap.of() :
ImmutableMap.copyOf(properties);
+ this.audit = audit;
+ }
+
+ /**
+ * Constructs a FilesetInfo object with specified details.
+ *
+ * @param name The name of the fileset.
+ * @param comment An optional comment about the fileset. Can be {@code null}.
+ * @param type The type of the fileset.
+ * @param storageLocations The storage locations of the fileset.
+ * @param properties A map of properties associated with the fileset. Can be
{@code null}.
+ * @param audit Optional audit information. Can be {@code null}.
+ */
+ public FilesetInfo(
+ String name,
+ String comment,
+ Fileset.Type type,
+ Map<String, String> storageLocations,
+ Map<String, String> properties,
+ Audit audit) {
+ this.name = name;
+ this.comment = comment;
+ this.type = type;
+ this.storageLocations =
+ storageLocations == null ? ImmutableMap.of() :
ImmutableMap.copyOf(storageLocations);
this.properties = properties == null ? ImmutableMap.of() :
ImmutableMap.copyOf(properties);
this.audit = audit;
}
@@ -110,7 +141,12 @@ public final class FilesetInfo {
* @return The storage location.
*/
public String storageLocation() {
- return storageLocation;
+ return storageLocations.get(LOCATION_NAME_UNKNOWN);
+ }
+
+ /** @return The storage locations of the fileset. */
+ public Map<String, String> storageLocations() {
+ return storageLocations;
}
/**
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index 447f3405c6..7a3620d41c 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -81,6 +81,7 @@ public class SchemaMetaService {
public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
String metalakeName, String catalogName, String schemaName) {
+ System.out.println("test");
return SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper ->
diff --git a/core/src/test/java/org/apache/gravitino/TestFileset.java
b/core/src/test/java/org/apache/gravitino/TestFileset.java
index b7129a7e76..7c65ee7b52 100644
--- a/core/src/test/java/org/apache/gravitino/TestFileset.java
+++ b/core/src/test/java/org/apache/gravitino/TestFileset.java
@@ -36,7 +36,7 @@ public class TestFileset extends BaseFileset {
fileset.properties = properties;
fileset.auditInfo = auditInfo;
fileset.type = type;
- fileset.storageLocation = storageLocation;
+ fileset.storageLocations = storageLocations;
return fileset;
}
}
diff --git
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index 17d3f5c217..4e0d710871 100644
---
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.connector;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -29,6 +31,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -56,6 +59,7 @@ import
org.apache.gravitino.exceptions.ModelAlreadyExistsException;
import
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchModelException;
import org.apache.gravitino.exceptions.NoSuchModelVersionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -400,15 +404,32 @@ public class TestCatalogOperations
}
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
AuditInfo auditInfo =
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+ if (storageLocations != null && storageLocations.size() == 1) {
+ properties =
+ Optional.ofNullable(properties)
+ .map(
+ props ->
+ ImmutableMap.<String, String>builder()
+ .putAll(props)
+ .put(
+ PROPERTY_DEFAULT_LOCATION_NAME,
+ storageLocations.keySet().iterator().next())
+ .build())
+ .orElseGet(
+ () ->
+ ImmutableMap.of(
+ PROPERTY_DEFAULT_LOCATION_NAME,
+ storageLocations.keySet().iterator().next()));
+ }
TestFileset fileset =
TestFileset.builder()
.withName(ident.name())
@@ -416,7 +437,7 @@ public class TestCatalogOperations
.withProperties(properties)
.withAuditInfo(auditInfo)
.withType(type)
- .withStorageLocation(storageLocation)
+ .withStorageLocations(storageLocations)
.build();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
@@ -482,7 +503,7 @@ public class TestCatalogOperations
.withProperties(newProps)
.withAuditInfo(updatedAuditInfo)
.withType(fileset.type())
- .withStorageLocation(fileset.storageLocation())
+ .withStorageLocations(fileset.storageLocations())
.build();
filesets.put(newIdent, updatedFileset);
return updatedFileset;
@@ -499,7 +520,7 @@ public class TestCatalogOperations
}
@Override
- public String getFileLocation(NameIdentifier ident, String subPath) {
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName) {
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
@@ -509,8 +530,16 @@ public class TestCatalogOperations
}
Fileset fileset = loadFileset(ident);
+ Map<String, String> storageLocations = fileset.storageLocations();
+ String targetLocationName =
+ Optional.ofNullable(locationName)
+ .orElse(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+ if (storageLocations == null ||
!storageLocations.containsKey(targetLocationName)) {
+ throw new NoSuchLocationNameException(
+ "The location name: %s does not exist in the fileset: %s",
targetLocationName, ident);
+ }
- boolean isSingleFile = checkSingleFile(fileset);
+ boolean isSingleFile =
checkSingleFile(storageLocations.get(targetLocationName));
// if the storage location is a single file, it cannot have sub path to
access.
if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
throw new GravitinoRuntimeException(
@@ -953,9 +982,9 @@ public class TestCatalogOperations
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}
- private boolean checkSingleFile(Fileset fileset) {
+ private boolean checkSingleFile(String location) {
try {
- File locationPath = new File(fileset.storageLocation());
+ File locationPath = new File(location);
return locationPath.isFile();
} catch (Exception e) {
return false;
diff --git
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
index 803b38522d..1f0795652b 100644
---
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
+++
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.listener.api.event;
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -335,17 +336,18 @@ public class TestFilesetEvent {
when(fileset.properties()).thenReturn(ImmutableMap.of("a", "b"));
when(fileset.name()).thenReturn("fileset");
when(fileset.auditInfo()).thenReturn(null);
- when(fileset.storageLocation()).thenReturn("location");
+ when(fileset.storageLocation()).thenCallRealMethod();
+
when(fileset.storageLocations()).thenReturn(ImmutableMap.of(LOCATION_NAME_UNKNOWN,
"location"));
return fileset;
}
private FilesetDispatcher mockFilesetDispatcher() {
FilesetDispatcher dispatcher = mock(FilesetDispatcher.class);
- when(dispatcher.createFileset(
+ when(dispatcher.createMultipleLocationFileset(
any(NameIdentifier.class),
any(String.class),
any(Fileset.Type.class),
- any(String.class),
+ any(Map.class),
any(Map.class)))
.thenReturn(fileset);
when(dispatcher.loadFileset(any(NameIdentifier.class))).thenReturn(fileset);
diff --git
a/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
b/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
index a24b810bf7..4c8f1f2325 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
@@ -114,7 +114,7 @@ public class TestEntityCombinedObject {
EntityCombinedFileset.of(originFileset).withHiddenProperties(hiddenProperties);
Assertions.assertEquals(originFileset.name(),
entityCombinedFileset.name());
Assertions.assertEquals(originFileset.comment(),
entityCombinedFileset.comment());
- Map<String, String> filterProp = new HashMap<>(originTopic.properties());
+ Map<String, String> filterProp = new HashMap<>(originFileset.properties());
filterProp.remove("k3");
filterProp.remove(null);
filterProp.remove("k5");