This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 9c4199eaea [#6892] feat(core): support fileset multiple locations in 
core module (#6930)
9c4199eaea is described below

commit 9c4199eaeacc543078e33a7886edcfed45f43464
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 15 11:29:07 2025 +0800

    [#6892] feat(core): support fileset multiple locations in core module 
(#6930)
    
    ### What changes were proposed in this pull request?
    
    support fileset multiple locations in the core and catalog module
    
    ### Why are the changes needed?
    
    Fix: #6892
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    tests added
    
    Co-authored-by: mchades <[email protected]>
---
 .../catalog/hadoop/HadoopCatalogOperations.java    | 518 ++++++++++----
 .../gravitino/catalog/hadoop/HadoopFileset.java    |   2 +-
 .../hadoop/HadoopFilesetPropertiesMetadata.java    |   9 +
 .../hadoop/SecureHadoopCatalogOperations.java      |  26 +-
 .../hadoop/TestHadoopCatalogOperations.java        | 766 ++++++++++++++++++++-
 .../hadoop/integration/test/HadoopCatalogIT.java   |  42 +-
 .../test/HadoopUserImpersonationIT.java            |   9 +-
 .../tests/integration/test_fileset_catalog.py      |  17 +-
 .../gravitino/catalog/EntityCombinedFileset.java   |  12 +-
 .../catalog/FilesetNormalizeDispatcher.java        |  14 +-
 .../catalog/FilesetOperationDispatcher.java        |  26 +-
 .../apache/gravitino/connector/BaseFileset.java    |  21 +-
 .../gravitino/hook/FilesetHookDispatcher.java      |  15 +-
 .../gravitino/listener/FilesetEventDispatcher.java |  26 +-
 .../listener/api/event/GetFileLocationEvent.java   |  31 +
 .../api/event/GetFileLocationFailureEvent.java     |  31 +
 .../api/event/GetFileLocationPreEvent.java         |  24 +
 .../gravitino/listener/api/info/FilesetInfo.java   |  44 +-
 .../relational/service/SchemaMetaService.java      |   1 +
 .../java/org/apache/gravitino/TestFileset.java     |   2 +-
 .../gravitino/connector/TestCatalogOperations.java |  45 +-
 .../listener/api/event/TestFilesetEvent.java       |   8 +-
 .../gravitino/meta/TestEntityCombinedObject.java   |   2 +-
 23 files changed, 1459 insertions(+), 232 deletions(-)

diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 0cf67f6ce4..74f7a0ebe9 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -21,8 +21,10 @@ package org.apache.gravitino.catalog.hadoop;
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static org.apache.gravitino.file.Fileset.PROPERTY_FILESET_PLACEHOLDER;
 import static 
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
+import static 
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
 import static org.apache.gravitino.file.Fileset.PROPERTY_SCHEMA_PLACEHOLDER;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -34,9 +36,11 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -66,6 +70,7 @@ import 
org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
@@ -102,7 +107,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
 
   @VisibleForTesting Configuration hadoopConf;
 
-  @VisibleForTesting Optional<Path> catalogStorageLocation;
+  @VisibleForTesting Map<String, Path> catalogStorageLocations;
 
   private Map<String, String> conf;
 
@@ -174,12 +179,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
                 .getOrDefault(config, 
HadoopCatalogPropertiesMetadata.LOCATION);
     checkPlaceholderValue(catalogLocation);
 
-    this.catalogStorageLocation =
-        StringUtils.isNotBlank(catalogLocation)
-            ? Optional.of(catalogLocation)
-                .map(s -> s.endsWith(SLASH) ? s : s + SLASH)
-                .map(Path::new)
-            : Optional.empty();
+    this.catalogStorageLocations = getAndCheckCatalogStorageLocations(config);
   }
 
   @Override
@@ -210,7 +210,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
           .withName(ident.name())
           .withType(filesetEntity.filesetType())
           .withComment(filesetEntity.comment())
-          .withStorageLocation(filesetEntity.storageLocation())
+          .withStorageLocations(filesetEntity.storageLocations())
           .withProperties(filesetEntity.properties())
           .withAuditInfo(filesetEntity.auditInfo())
           .build();
@@ -223,13 +223,20 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
+    storageLocations.forEach(
+        (name, path) -> {
+          if (StringUtils.isBlank(name)) {
+            throw new IllegalArgumentException("Location name must not be 
blank");
+          }
+        });
+
     try {
       if (store.exists(ident, Entity.EntityType.FILESET)) {
         throw new FilesetAlreadyExistsException("Fileset %s already exists", 
ident);
@@ -249,46 +256,82 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     }
 
     // For external fileset, the storageLocation must be set.
-    if (type == Fileset.Type.EXTERNAL && StringUtils.isBlank(storageLocation)) 
{
-      throw new IllegalArgumentException(
-          "Storage location must be set for external fileset " + ident);
+    if (type == Fileset.Type.EXTERNAL) {
+      if (storageLocations.isEmpty()) {
+        throw new IllegalArgumentException(
+            "Storage location must be set for external fileset " + ident);
+      }
+      storageLocations.forEach(
+          (locationName, location) -> {
+            if (StringUtils.isBlank(location)) {
+              throw new IllegalArgumentException(
+                  "Storage location must be set for external fileset "
+                      + ident
+                      + " with location name "
+                      + locationName);
+            }
+          });
     }
 
     // Either catalog property "location", or schema property "location", or 
storageLocation must be
     // set for managed fileset.
-    Path schemaPath = getSchemaPath(schemaIdent.name(), 
schemaEntity.properties());
-    if (schemaPath == null && StringUtils.isBlank(storageLocation)) {
+    Map<String, Path> schemaPaths =
+        getAndCheckSchemaPaths(schemaIdent.name(), schemaEntity.properties());
+    if (schemaPaths.isEmpty() && storageLocations.isEmpty()) {
       throw new IllegalArgumentException(
           "Storage location must be set for fileset "
               + ident
               + " when it's catalog and schema location are not set");
     }
-    checkPlaceholderValue(storageLocation);
+    storageLocations.forEach((k, location) -> checkPlaceholderValue(location));
 
-    Path filesetPath =
-        caculateFilesetPath(
-            schemaIdent.name(), ident.name(), storageLocation, schemaPath, 
properties);
+    Map<String, Path> filesetPaths =
+        calculateFilesetPaths(
+            schemaIdent.name(), ident.name(), storageLocations, schemaPaths, 
properties);
+    properties = setDefaultLocationIfAbsent(properties, filesetPaths);
 
+    ImmutableMap.Builder<String, Path> filesetPathsBuilder = 
ImmutableMap.builder();
     try {
       // formalize the path to avoid path without scheme, uri, authority, etc.
-      FileSystem fs = getFileSystem(filesetPath, conf);
-      filesetPath = filesetPath.makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
-      if (!fs.exists(filesetPath)) {
-        if (!fs.mkdirs(filesetPath)) {
-          throw new RuntimeException(
-              "Failed to create fileset " + ident + " location " + 
filesetPath);
-        }
+      for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
+        Path formalizePath = formalizePath(entry.getValue(), conf);
+        filesetPathsBuilder.put(entry.getKey(), formalizePath);
 
-        LOG.info("Created fileset {} location {}", ident, filesetPath);
-      } else {
-        LOG.info("Fileset {} manages the existing location {}", ident, 
filesetPath);
+        FileSystem fs = getFileSystem(formalizePath, conf);
+        if (!fs.exists(formalizePath)) {
+          if (!fs.mkdirs(formalizePath)) {
+            throw new RuntimeException(
+                "Failed to create fileset "
+                    + ident
+                    + " location "
+                    + formalizePath
+                    + " with location name "
+                    + entry.getKey());
+          }
+
+          LOG.info(
+              "Created fileset {} location {} with location name {}",
+              ident,
+              formalizePath,
+              entry.getKey());
+        } else {
+          LOG.info(
+              "Fileset {} manages the existing location {} with location name 
{}",
+              ident,
+              formalizePath,
+              entry.getKey());
+        }
       }
 
     } catch (IOException ioe) {
-      throw new RuntimeException(
-          "Failed to create fileset " + ident + " location " + filesetPath, 
ioe);
+      throw new RuntimeException("Failed to create fileset " + ident, ioe);
     }
 
+    Map<String, String> formattedStorageLocations =
+        Maps.transformValues(filesetPathsBuilder.build(), Path::toString);
+    validateLocationHierarchy(
+        Maps.transformValues(schemaPaths, Path::toString), 
formattedStorageLocations);
+
     StringIdentifier stringId = StringIdentifier.fromProperties(properties);
     Preconditions.checkArgument(stringId != null, "Property String identifier 
should not be null");
 
@@ -302,7 +345,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
             // Store the storageLocation to the store. If the 
"storageLocation" is null for managed
             // fileset, Gravitino will get and store the location based on the 
catalog/schema's
             // location and store it to the store.
-            .withStorageLocations(ImmutableMap.of(LOCATION_NAME_UNKNOWN, 
filesetPath.toString()))
+            .withStorageLocations(formattedStorageLocations)
             .withProperties(properties)
             .withAuditInfo(
                 AuditInfo.builder()
@@ -321,12 +364,52 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         .withName(ident.name())
         .withComment(comment)
         .withType(type)
-        .withStorageLocation(filesetPath.toString())
+        .withStorageLocations(formattedStorageLocations)
         .withProperties(filesetEntity.properties())
         .withAuditInfo(filesetEntity.auditInfo())
         .build();
   }
 
+  private Map<String, String> setDefaultLocationIfAbsent(
+      Map<String, String> properties, Map<String, Path> filesetPaths) {
+    Preconditions.checkArgument(
+        filesetPaths != null && !filesetPaths.isEmpty(), "Fileset paths must 
not be null or empty");
+
+    if (filesetPaths.size() == 1) {
+      // If the fileset has only one location, it is the default location.
+      String defaultLocationName = filesetPaths.keySet().iterator().next();
+      if (properties == null || properties.isEmpty()) {
+        return Collections.singletonMap(PROPERTY_DEFAULT_LOCATION_NAME, 
defaultLocationName);
+      }
+      if (!properties.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {
+        return ImmutableMap.<String, String>builder()
+            .putAll(properties)
+            .put(PROPERTY_DEFAULT_LOCATION_NAME, defaultLocationName)
+            .build();
+      }
+
+      Preconditions.checkArgument(
+          
defaultLocationName.equals(properties.get(PROPERTY_DEFAULT_LOCATION_NAME)),
+          "Default location name must be the same as the fileset location 
name");
+      return ImmutableMap.copyOf(properties);
+    }
+
+    // multiple locations
+    Preconditions.checkArgument(
+        properties != null
+            && !properties.isEmpty()
+            && properties.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)
+            && 
filesetPaths.containsKey(properties.get(PROPERTY_DEFAULT_LOCATION_NAME)),
+        "Default location name must be set and must be one of the fileset 
locations, "
+            + "location names: "
+            + filesetPaths.keySet()
+            + ", default location name: "
+            + Optional.ofNullable(properties)
+                .map(p -> p.get(PROPERTY_DEFAULT_LOCATION_NAME))
+                .orElse(null));
+    return ImmutableMap.copyOf(properties);
+  }
+
   @Override
   public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
       throws NoSuchFilesetException, IllegalArgumentException {
@@ -350,7 +433,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
           .withName(updatedFilesetEntity.name())
           .withComment(updatedFilesetEntity.comment())
           .withType(updatedFilesetEntity.filesetType())
-          .withStorageLocation(updatedFilesetEntity.storageLocation())
+          .withStorageLocations(updatedFilesetEntity.storageLocations())
           .withProperties(updatedFilesetEntity.properties())
           .withAuditInfo(updatedFilesetEntity.auditInfo())
           .build();
@@ -371,19 +454,43 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     try {
       FilesetEntity filesetEntity =
           store.get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
-      Path filesetPath = new Path(filesetEntity.storageLocation());
 
       // For managed fileset, we should delete the related files.
       if (filesetEntity.filesetType() == Fileset.Type.MANAGED) {
-        FileSystem fs = getFileSystem(filesetPath, conf);
-        if (fs.exists(filesetPath)) {
-          if (!fs.delete(filesetPath, true)) {
-            LOG.warn("Failed to delete fileset {} location {}", ident, 
filesetPath);
-            return false;
-          }
-
-        } else {
-          LOG.warn("Fileset {} location {} does not exist", ident, 
filesetPath);
+        AtomicReference<IOException> exception = new AtomicReference<>();
+        Map<String, Path> storageLocations =
+            Maps.transformValues(filesetEntity.storageLocations(), Path::new);
+        storageLocations.forEach(
+            (locationName, location) -> {
+              try {
+                FileSystem fs = getFileSystem(location, conf);
+                if (fs.exists(location)) {
+                  if (!fs.delete(location, true)) {
+                    LOG.warn(
+                        "Failed to delete fileset {} location {} with location 
name {}",
+                        ident,
+                        location,
+                        locationName);
+                  }
+                } else {
+                  LOG.warn(
+                      "Fileset {} location {} with location name {} does not 
exist",
+                      ident,
+                      location,
+                      locationName);
+                }
+              } catch (IOException ioe) {
+                LOG.warn(
+                    "Failed to delete fileset {} location {} with location 
name {}",
+                    ident,
+                    location,
+                    locationName,
+                    ioe);
+                exception.set(ioe);
+              }
+            });
+        if (exception.get() != null) {
+          throw exception.get();
         }
       }
 
@@ -397,8 +504,8 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
-      throws NoSuchFilesetException {
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
     Preconditions.checkArgument(subPath != null, "subPath must not be null");
     String processedSubPath;
     if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
@@ -408,10 +515,18 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     }
 
     Fileset fileset = loadFileset(ident);
+    locationName =
+        locationName == null
+            ? fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME)
+            : locationName;
+    if (!fileset.storageLocations().containsKey(locationName)) {
+      throw new NoSuchLocationNameException(
+          "Location name %s does not exist in fileset %s", locationName, 
ident);
+    }
 
-    boolean isSingleFile = checkSingleFile(fileset);
+    boolean isSingleFile = checkSingleFile(fileset, locationName);
     // if the storage location is a single file, it cannot have sub path to 
access.
-    if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
+    if (isSingleFile && StringUtils.isNotBlank(processedSubPath)) {
       throw new GravitinoRuntimeException(
           "Sub path should always be blank, because the fileset only mounts a 
single file.");
     }
@@ -455,14 +570,11 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     // 1. if the storage location is a single file, we pass the storage 
location directly
     // 2. if the processed sub path is blank, we pass the storage location 
directly
     if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
-      fileLocation = fileset.storageLocation();
+      fileLocation = fileset.storageLocations().get(locationName);
     } else {
       // the processed sub path always starts with "/" if it is not blank,
       // so we can safely remove the tailing slash if storage location ends 
with "/".
-      String storageLocation =
-          fileset.storageLocation().endsWith(SLASH)
-              ? fileset.storageLocation().substring(0, 
fileset.storageLocation().length() - 1)
-              : fileset.storageLocation();
+      String storageLocation = 
removeTrailingSlash(fileset.storageLocations().get(locationName));
       fileLocation = String.format("%s%s", storageLocation, processedSubPath);
     }
     return fileLocation;
@@ -479,26 +591,42 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
       throw new RuntimeException("Failed to check if schema " + ident + " 
exists", ioe);
     }
 
-    Path schemaPath = getSchemaPath(ident.name(), properties);
-    if (schemaPath != null && !containsPlaceholder(schemaPath.toString())) {
-      try {
-        FileSystem fs = getFileSystem(schemaPath, conf);
-        if (!fs.exists(schemaPath)) {
-          if (!fs.mkdirs(schemaPath)) {
-            // Fail the operation when failed to create the schema path.
-            throw new RuntimeException(
-                "Failed to create schema " + ident + " location " + 
schemaPath);
+    Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(), 
properties);
+    schemaPaths.forEach(
+        (locationName, schemaPath) -> {
+          if (schemaPath != null && 
!containsPlaceholder(schemaPath.toString())) {
+            try {
+              FileSystem fs = getFileSystem(schemaPath, conf);
+              if (!fs.exists(schemaPath)) {
+                if (!fs.mkdirs(schemaPath)) {
+                  // Fail the operation when failed to create the schema path.
+                  throw new RuntimeException(
+                      "Failed to create schema "
+                          + ident
+                          + " location: "
+                          + schemaPath
+                          + " with location name: "
+                          + locationName);
+                }
+                LOG.info(
+                    "Created schema {} location: {} with location name: {}",
+                    ident,
+                    schemaPath,
+                    locationName);
+              } else {
+                LOG.info(
+                    "Schema {} manages the existing location: {} with location 
name: {}",
+                    ident,
+                    schemaPath,
+                    locationName);
+              }
+
+            } catch (IOException ioe) {
+              throw new RuntimeException(
+                  "Failed to create schema " + ident + " location " + 
schemaPath, ioe);
+            }
           }
-          LOG.info("Created schema {} location {}", ident, schemaPath);
-        } else {
-          LOG.info("Schema {} manages the existing location {}", ident, 
schemaPath);
-        }
-
-      } catch (IOException ioe) {
-        throw new RuntimeException(
-            "Failed to create schema " + ident + " location " + schemaPath, 
ioe);
-      }
-    }
+        });
 
     return super.createSchema(ident, comment, properties);
   }
@@ -536,9 +664,11 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
       SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, 
SchemaEntity.class);
       Map<String, String> properties =
           
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
-      Path schemaPath = getSchemaPath(ident.name(), properties);
+      Map<String, Path> schemaPaths = getAndCheckSchemaPaths(ident.name(), 
properties);
 
       boolean dropped = super.dropSchema(ident, cascade);
+      // If the schema entity is failed to be deleted, we should not delete 
the storage location
+      // and return false immediately.
       if (!dropped) {
         return false;
       }
@@ -560,39 +690,73 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
                   // than the catalog thread. We need to set the context 
classloader to the
                   // catalog's classloader to avoid classloading issues.
                   Thread.currentThread().setContextClassLoader(cl);
-                  Path filesetPath = new Path(f.storageLocation());
-                  FileSystem fs = getFileSystem(filesetPath, conf);
-                  if (fs.exists(filesetPath)) {
-                    if (!fs.delete(filesetPath, true)) {
-                      LOG.warn("Failed to delete fileset {} location {}", 
f.name(), filesetPath);
-                    }
-                  }
-                } catch (IOException ioe) {
-                  LOG.warn(
-                      "Failed to delete fileset {} location {}",
-                      f.name(),
-                      f.storageLocation(),
-                      ioe);
+                  f.storageLocations()
+                      .forEach(
+                          (locationName, location) -> {
+                            try {
+                              Path filesetPath = new Path(location);
+                              FileSystem fs = getFileSystem(filesetPath, conf);
+                              if (fs.exists(filesetPath)) {
+                                if (!fs.delete(filesetPath, true)) {
+                                  LOG.warn(
+                                      "Failed to delete fileset {} location: 
{} with location name: {}",
+                                      f.name(),
+                                      filesetPath,
+                                      locationName);
+                                }
+                              }
+                            } catch (IOException ioe) {
+                              LOG.warn(
+                                  "Failed to delete fileset {} location: {} 
with location name: {}",
+                                  f.name(),
+                                  location,
+                                  locationName,
+                                  ioe);
+                            }
+                          });
                 } finally {
                   Thread.currentThread().setContextClassLoader(oldCl);
                 }
               });
 
       // Delete the schema path if it exists and is empty.
-      if (schemaPath != null) {
-        FileSystem fs = getFileSystem(schemaPath, conf);
-        if (fs.exists(schemaPath)) {
-          FileStatus[] statuses = fs.listStatus(schemaPath);
-          if (statuses.length == 0) {
-            if (fs.delete(schemaPath, true)) {
-              LOG.info("Deleted schema {} location {}", ident, schemaPath);
-            } else {
-              LOG.warn(
-                  "Failed to delete schema {} because it has files/folders 
under location {}",
-                  ident,
-                  schemaPath);
-            }
-          }
+      if (!schemaPaths.isEmpty()) {
+        AtomicReference<RuntimeException> exception = new AtomicReference<>();
+        schemaPaths.forEach(
+            (locationName, schemaPath) -> {
+              try {
+                FileSystem fs = getFileSystem(schemaPath, conf);
+                if (fs.exists(schemaPath)) {
+                  FileStatus[] statuses = fs.listStatus(schemaPath);
+                  if (statuses.length == 0) {
+                    if (fs.delete(schemaPath, true)) {
+                      LOG.info(
+                          "Deleted schema {} location {} with location name 
{}",
+                          ident,
+                          schemaPath,
+                          locationName);
+                    } else {
+                      LOG.warn(
+                          "Failed to delete schema {} because it has 
files/folders under location {} with location name {}",
+                          ident,
+                          schemaPath,
+                          locationName);
+                    }
+                  }
+                }
+              } catch (IOException ioe) {
+                LOG.warn(
+                    "Failed to delete schema {} location {} with location name 
{}",
+                    ident,
+                    schemaPath,
+                    locationName,
+                    ioe);
+                exception.set(
+                    new RuntimeException("Failed to delete schema " + ident + 
" location", ioe));
+              }
+            });
+        if (exception.get() != null) {
+          throw exception.get();
         }
       }
 
@@ -666,6 +830,108 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         .build();
   }
 
+  private void validateLocationHierarchy(
+      Map<String, String> schemaLocations, Map<String, String> 
filesetLocations) {
+    if (schemaLocations == null
+        || filesetLocations == null
+        || schemaLocations.isEmpty()
+        || filesetLocations.isEmpty()) {
+      return;
+    }
+
+    filesetLocations.forEach(
+        (filesetLocationName, filesetLocation) ->
+            schemaLocations.forEach(
+                (schemaLocationName, schemaLocation) -> {
+                  if (ensureTrailingSlash(schemaLocation)
+                      .startsWith(ensureTrailingSlash(filesetLocation))) {
+                    throw new IllegalArgumentException(
+                        String.format(
+                            "The fileset location %s with location name %s is 
not allowed "
+                                + "to be the parent of the schema location %s 
with location name %s",
+                            filesetLocation,
+                            filesetLocationName,
+                            schemaLocation,
+                            schemaLocationName));
+                  }
+                }));
+  }
+
+  private Map<String, Path> getAndCheckCatalogStorageLocations(Map<String, 
String> properties) {
+    ImmutableMap.Builder<String, Path> catalogStorageLocations = 
ImmutableMap.builder();
+    String unnamedLocation =
+        (String)
+            propertiesMetadata
+                .catalogPropertiesMetadata()
+                .getOrDefault(properties, 
HadoopCatalogPropertiesMetadata.LOCATION);
+    if (StringUtils.isNotBlank(unnamedLocation)) {
+      checkPlaceholderValue(unnamedLocation);
+      catalogStorageLocations.put(
+          LOCATION_NAME_UNKNOWN, new 
Path(ensureTrailingSlash(unnamedLocation)));
+    }
+
+    properties.forEach(
+        (k, v) -> {
+          if (k.startsWith(PROPERTY_MULTIPLE_LOCATIONS_PREFIX) && 
StringUtils.isNotBlank(v)) {
+            String locationName = 
k.substring(PROPERTY_MULTIPLE_LOCATIONS_PREFIX.length());
+            if (StringUtils.isBlank(locationName)) {
+              throw new IllegalArgumentException("Location name must not be 
blank");
+            }
+            checkPlaceholderValue(v);
+            catalogStorageLocations.put(locationName, new 
Path((ensureTrailingSlash(v))));
+          }
+        });
+    return catalogStorageLocations.build();
+  }
+
+  private Map<String, Path> getAndCheckSchemaPaths(
+      String schemaName, Map<String, String> schemaProps) {
+    Map<String, Path> schemaPaths = new HashMap<>();
+    catalogStorageLocations.forEach(
+        (name, path) -> {
+          if (containsPlaceholder(path.toString())) {
+            schemaPaths.put(name, path);
+          } else {
+            schemaPaths.put(name, new Path(path, schemaName));
+          }
+        });
+
+    String unnamedSchemaLocation =
+        (String)
+            propertiesMetadata
+                .schemaPropertiesMetadata()
+                .getOrDefault(schemaProps, 
HadoopSchemaPropertiesMetadata.LOCATION);
+    checkPlaceholderValue(unnamedSchemaLocation);
+    Optional.ofNullable(unnamedSchemaLocation)
+        .map(this::ensureTrailingSlash)
+        .map(Path::new)
+        .ifPresent(p -> schemaPaths.put(LOCATION_NAME_UNKNOWN, p));
+
+    schemaProps.forEach(
+        (k, path) -> {
+          if (k.startsWith(PROPERTY_MULTIPLE_LOCATIONS_PREFIX)) {
+            checkPlaceholderValue(path);
+            String locationName = 
k.substring(PROPERTY_MULTIPLE_LOCATIONS_PREFIX.length());
+            if (StringUtils.isBlank(locationName)) {
+              throw new IllegalArgumentException("Location name must not be 
blank");
+            }
+            Optional.ofNullable(path)
+                .map(this::ensureTrailingSlash)
+                .map(Path::new)
+                .ifPresent(p -> schemaPaths.put(locationName, p));
+          }
+        });
+    return ImmutableMap.copyOf(schemaPaths);
+  }
+
+  private String ensureTrailingSlash(String path) {
+    return path.endsWith(SLASH) ? path : path + SLASH;
+  }
+
+  private String removeTrailingSlash(String path) {
+    return path.endsWith(SLASH) ? path.substring(0, path.length() - 1) : path;
+  }
+
   private FilesetEntity updateFilesetEntity(
       NameIdentifier ident, FilesetEntity filesetEntity, FilesetChange... 
changes) {
     Map<String, String> props =
@@ -700,8 +966,7 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         .withId(filesetEntity.id())
         .withComment(newComment)
         .withFilesetType(filesetEntity.filesetType())
-        .withStorageLocations(
-            ImmutableMap.of(LOCATION_NAME_UNKNOWN, 
filesetEntity.storageLocation()))
+        .withStorageLocations(filesetEntity.storageLocations())
         .withProperties(props)
         .withAuditInfo(
             AuditInfo.builder()
@@ -713,23 +978,6 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         .build();
   }
 
-  private Path getSchemaPath(String name, Map<String, String> properties) {
-    String schemaLocation =
-        (String)
-            propertiesMetadata
-                .schemaPropertiesMetadata()
-                .getOrDefault(properties, 
HadoopSchemaPropertiesMetadata.LOCATION);
-    checkPlaceholderValue(schemaLocation);
-
-    return Optional.ofNullable(schemaLocation)
-        .map(s -> s.endsWith(SLASH) ? s : s + SLASH)
-        .map(Path::new)
-        .orElse(
-            catalogStorageLocation
-                .map(p -> containsPlaceholder(p.toString()) ? p : new Path(p, 
name))
-                .orElse(null));
-  }
-
   /**
    * Check whether the placeholder in the location is valid. Throw an 
exception if the location
    * contains a placeholder with an empty value.
@@ -763,6 +1011,28 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         && LOCATION_PLACEHOLDER_PATTERN.matcher(location).find();
   }
 
+  private Map<String, Path> calculateFilesetPaths(
+      String schemaName,
+      String filesetName,
+      Map<String, String> storageLocations,
+      Map<String, Path> schemaPaths,
+      Map<String, String> properties) {
+    ImmutableMap.Builder<String, Path> filesetPaths = ImmutableMap.builder();
+    Set<String> locationNames = new HashSet<>(schemaPaths.keySet());
+    locationNames.addAll(storageLocations.keySet());
+
+    locationNames.forEach(
+        locationName -> {
+          String storageLocation = storageLocations.get(locationName);
+          Path schemaPath = schemaPaths.get(locationName);
+          filesetPaths.put(
+              locationName,
+              caculateFilesetPath(
+                  schemaName, filesetName, storageLocation, schemaPath, 
properties));
+        });
+    return filesetPaths.build();
+  }
+
   private Path caculateFilesetPath(
       String schemaName,
       String filesetName,
@@ -844,9 +1114,9 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         && !CallerContext.CallerContextHolder.get().context().isEmpty();
   }
 
-  private boolean checkSingleFile(Fileset fileset) {
+  private boolean checkSingleFile(Fileset fileset, String locationName) {
     try {
-      Path locationPath = new Path(fileset.storageLocation());
+      Path locationPath = new 
Path(fileset.storageLocations().get(locationName));
       return getFileSystem(locationPath, 
conf).getFileStatus(locationPath).isFile();
     } catch (FileNotFoundException e) {
       // We should always return false here, same with the logic in 
`FileSystem.isFile(Path f)`.
@@ -854,8 +1124,10 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     } catch (IOException e) {
       throw new GravitinoRuntimeException(
           e,
-          "Exception occurs when checking whether fileset: %s mounts a single 
file",
-          fileset.name());
+          "Exception occurs when checking whether fileset: %s "
+              + "mounts a single file with location name: %s",
+          fileset.name(),
+          locationName);
     }
   }
 
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
index b292d29f15..6d622cf1d0 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFileset.java
@@ -31,7 +31,7 @@ public class HadoopFileset extends BaseFileset {
       HadoopFileset fileset = new HadoopFileset();
       fileset.name = name;
       fileset.comment = comment;
-      fileset.storageLocation = storageLocation;
+      fileset.storageLocations = storageLocations;
       fileset.type = type;
       fileset.properties = properties;
       fileset.auditInfo = auditInfo;
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
index 6ff885ac83..1f6f578f76 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopFilesetPropertiesMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.catalog.hadoop;
 
 import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static org.apache.gravitino.file.Fileset.PROPERTY_FILESET_PLACEHOLDER;
 import static 
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
 import static org.apache.gravitino.file.Fileset.PROPERTY_SCHEMA_PLACEHOLDER;
@@ -64,6 +65,14 @@ public class HadoopFilesetPropertiesMetadata extends 
BasePropertiesMetadata {
                 null /* default value */,
                 false /* hidden */,
                 false /* reserved */))
+        .put(
+            PROPERTY_DEFAULT_LOCATION_NAME,
+            PropertyEntry.stringOptionalPropertyEntry(
+                PROPERTY_DEFAULT_LOCATION_NAME,
+                "The default location name for the fileset",
+                true /* immutable */,
+                null,
+                false /* hidden */))
         .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
         .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
         .putAll(CredentialConfig.CREDENTIAL_PROPERTY_ENTRIES);
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index 7ae10805b5..2982080306 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -50,6 +50,7 @@ import 
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
@@ -105,11 +106,11 @@ public class SecureHadoopCatalogOperations
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     String apiUser = PrincipalUtils.getCurrentUserName();
@@ -120,8 +121,8 @@ public class SecureHadoopCatalogOperations
     return userContext.doAs(
         () -> {
           setUser(apiUser);
-          return hadoopCatalogOperations.createFileset(
-              ident, comment, type, storageLocation, properties);
+          return hadoopCatalogOperations.createMultipleLocationFileset(
+              ident, comment, type, storageLocations, properties);
         },
         ident);
   }
@@ -233,9 +234,9 @@ public class SecureHadoopCatalogOperations
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
-      throws NoSuchFilesetException {
-    return hadoopCatalogOperations.getFileLocation(ident, subPath);
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
+    return hadoopCatalogOperations.getFileLocation(ident, subPath, 
locationName);
   }
 
   @Override
@@ -260,7 +261,16 @@ public class SecureHadoopCatalogOperations
   @Override
   public List<PathContext> getPathContext(NameIdentifier filesetIdentifier) {
     Fileset fileset = loadFileset(filesetIdentifier);
-    String path = fileset.storageLocation();
+    Map<String, String> locations = fileset.storageLocations();
+    Preconditions.checkArgument(
+        locations != null && !locations.isEmpty(),
+        "No storage locations found for fileset: " + filesetIdentifier);
+
+    // todo: support multiple storage locations
+    Preconditions.checkArgument(
+        locations.size() == 1, "Only one storage location is supported for 
fileset now");
+
+    String path = locations.values().iterator().next();
     Preconditions.checkState(
         StringUtils.isNotBlank(path), "The location of fileset should not be 
empty.");
 
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index deff740dd2..97c01704ed 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -35,6 +35,10 @@ import static 
org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META;
 import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META;
+import static 
org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.LOCATION;
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
+import static 
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
@@ -281,6 +285,17 @@ public class TestHadoopCatalogOperations {
                       Mockito.anyString(), Mockito.anyString(), 
Mockito.eq("s1_" + name));
             });
 
+    multipleLocationsArguments()
+        .forEach(
+            arguments -> {
+              String name = (String) arguments.get()[0];
+              long schemaId = idGenerator.nextId();
+              doReturn(new SchemaIds(1L, 1L, schemaId))
+                  .when(spySchemaMetaService)
+                  .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
+                      Mockito.anyString(), Mockito.anyString(), 
Mockito.eq("s1_" + name));
+            });
+
     MockedStatic<MetalakeMetaService> metalakeMetaServiceMockedStatic =
         Mockito.mockStatic(MetalakeMetaService.class);
     MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
@@ -323,21 +338,21 @@ public class TestHadoopCatalogOperations {
     String value = conf.get("fs.defaultFS");
     Assertions.assertEquals("file:///", value);
 
-    emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/catalog");
+    emptyProps.put(LOCATION, "file:///tmp/catalog");
     ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
-    Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
+    Assertions.assertEquals(1, ops.catalogStorageLocations.size());
     Path expectedPath = new Path("file:///tmp/catalog");
-    Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+    Assertions.assertEquals(expectedPath, 
ops.catalogStorageLocations.get(LOCATION_NAME_UNKNOWN));
 
     // test placeholder in location
-    emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/{{catalog}}");
+    emptyProps.put(LOCATION, "file:///tmp/{{catalog}}");
     ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA);
-    Assertions.assertTrue(ops.catalogStorageLocation.isPresent());
+    Assertions.assertEquals(1, ops.catalogStorageLocations.size());
     expectedPath = new Path("file:///tmp/{{catalog}}");
-    Assertions.assertEquals(expectedPath, ops.catalogStorageLocation.get());
+    Assertions.assertEquals(expectedPath, 
ops.catalogStorageLocations.get(LOCATION_NAME_UNKNOWN));
 
     // test illegal placeholder in location
-    emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"file:///tmp/{{}}");
+    emptyProps.put(LOCATION, "file:///tmp/{{}}");
     Throwable exception =
         Assertions.assertThrows(
             IllegalArgumentException.class,
@@ -572,7 +587,7 @@ public class TestHadoopCatalogOperations {
 
     try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
       ops.initialize(
-          ImmutableMap.of(HadoopCatalogPropertiesMetadata.LOCATION, 
catalogPath),
+          ImmutableMap.of(LOCATION, catalogPath),
           randomCatalogInfo("m1", "c1"),
           HADOOP_PROPERTIES_METADATA);
       Schema schema1 = ops.loadSchema(id);
@@ -639,7 +654,7 @@ public class TestHadoopCatalogOperations {
     String comment = "comment_s1";
     Map<String, String> catalogProps = Maps.newHashMap();
     if (catalogPath != null) {
-      catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      catalogProps.put(LOCATION, catalogPath);
     }
 
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
@@ -763,7 +778,7 @@ public class TestHadoopCatalogOperations {
     String comment = "comment_s24";
     Map<String, String> catalogProps = Maps.newHashMap();
     if (catalogPath != null) {
-      catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      catalogProps.put(LOCATION, catalogPath);
     }
 
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
@@ -972,7 +987,7 @@ public class TestHadoopCatalogOperations {
 
       String location = "hdfs://localhost:9000";
       Map<String, String> catalogProperties = Maps.newHashMap();
-      catalogProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, 
location);
+      catalogProperties.put(LOCATION, location);
 
       ops.initialize(catalogProperties, randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
 
@@ -980,7 +995,7 @@ public class TestHadoopCatalogOperations {
       NameIdentifier nameIdentifier = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
 
       Map<String, String> schemaProperties = Maps.newHashMap();
-      schemaProperties.put(HadoopCatalogPropertiesMetadata.LOCATION, 
"hdfs://localhost:9000/user1");
+      schemaProperties.put(LOCATION, "hdfs://localhost:9000/user1");
       StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
       schemaProperties =
           Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, 
schemaProperties));
@@ -1106,6 +1121,10 @@ public class TestHadoopCatalogOperations {
     Fileset mockFileset = Mockito.mock(Fileset.class);
     when(mockFileset.name()).thenReturn(filesetName4);
     when(mockFileset.storageLocation()).thenReturn(filesetLocation4);
+    when(mockFileset.storageLocations())
+        .thenReturn(ImmutableMap.of(LOCATION_NAME_UNKNOWN, filesetLocation4));
+    when(mockFileset.properties())
+        .thenReturn(ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, 
LOCATION_NAME_UNKNOWN));
 
     try (HadoopCatalogOperations mockOps = 
Mockito.mock(HadoopCatalogOperations.class)) {
       mockOps.hadoopConf = new Configuration();
@@ -1113,6 +1132,7 @@ public class TestHadoopCatalogOperations {
       when(mockOps.getConf()).thenReturn(Maps.newHashMap());
       String subPath = "/test/test.parquet";
       when(mockOps.getFileLocation(filesetIdent, 
subPath)).thenCallRealMethod();
+      when(mockOps.getFileLocation(filesetIdent, subPath, 
null)).thenCallRealMethod();
       when(mockOps.getFileSystem(Mockito.any(), Mockito.any()))
           .thenReturn(FileSystem.getLocal(new Configuration()));
       String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
@@ -1177,7 +1197,7 @@ public class TestHadoopCatalogOperations {
     String comment = "comment_s1";
     Map<String, String> catalogProps = Maps.newHashMap();
     if (catalogPath != null) {
-      catalogProps.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      catalogProps.put(LOCATION, catalogPath);
     }
 
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
@@ -1221,6 +1241,679 @@ public class TestHadoopCatalogOperations {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("multipleLocationsArguments")
+  public void testMultipleLocations(
+      String name,
+      Fileset.Type type,
+      Map<String, String> catalogPaths,
+      Map<String, String> schemaPaths,
+      Map<String, String> storageLocations,
+      Map<String, String> filesetProps,
+      Map<String, String> expect)
+      throws IOException {
+    String schemaName = "s1_" + name;
+    String comment = "comment_s1";
+
+    NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
schemaName);
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+      if (!ops.schemaExists(schemaIdent)) {
+        createMultiLocationSchema(schemaName, comment, catalogPaths, 
schemaPaths);
+      }
+      Fileset fileset =
+          createMultiLocationFileset(
+              name, schemaName, "comment", type, catalogPaths, 
storageLocations, filesetProps);
+
+      Assertions.assertEquals(name, fileset.name());
+      Assertions.assertEquals(type, fileset.type());
+      Assertions.assertEquals("comment", fileset.comment());
+      Assertions.assertEquals(expect, fileset.storageLocations());
+      Assertions.assertEquals(
+          fileset.storageLocation(), 
fileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
+      
Assertions.assertNotNull(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+      if (filesetProps != null && 
filesetProps.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {
+        Assertions.assertEquals(
+            filesetProps.get(PROPERTY_DEFAULT_LOCATION_NAME),
+            fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+      }
+      if (filesetProps == null || 
!filesetProps.containsKey(PROPERTY_DEFAULT_LOCATION_NAME)) {}
+
+      // Test load
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
+      Fileset loadedFileset = ops.loadFileset(filesetIdent);
+      Assertions.assertEquals(name, loadedFileset.name());
+      Assertions.assertEquals(type, loadedFileset.type());
+      Assertions.assertEquals("comment", loadedFileset.comment());
+      Assertions.assertEquals(expect, loadedFileset.storageLocations());
+      Assertions.assertEquals(
+          loadedFileset.storageLocation(),
+          loadedFileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
+
+      // Test drop
+      ops.dropFileset(filesetIdent);
+      for (Map.Entry<String, String> location : expect.entrySet()) {
+        Path expectedPath = new Path(location.getValue());
+        FileSystem fs = expectedPath.getFileSystem(new Configuration());
+        if (type == Fileset.Type.MANAGED) {
+          Assertions.assertFalse(fs.exists(expectedPath));
+        } else {
+          Assertions.assertTrue(
+              fs.exists(expectedPath),
+              "location with name "
+                  + location.getKey()
+                  + " should exist, path: "
+                  + location.getValue());
+        }
+      }
+
+      // clean expected path if exist
+      try (FileSystem fs = FileSystem.newInstance(new Configuration())) {
+        for (String location : expect.values()) {
+          fs.delete(new Path(location), true);
+        }
+      }
+
+      // Test drop non-existent fileset
+      Assertions.assertFalse(ops.dropFileset(filesetIdent), "fileset should be 
non-existent");
+    }
+  }
+
+  @Test
+  public void testCreateMultipleLocationsWithExceptions() throws IOException {
+    // empty location name in catalog location
+    Map<String, String> illegalLocations =
+        ImmutableMap.of(PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "", 
TEST_ROOT_PATH + "/catalog31_1");
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      Exception exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  ops.initialize(
+                      illegalLocations, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA));
+      Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
+
+      // empty location name in schema location
+      ops.initialize(ImmutableMap.of(), randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+      exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  createMultiLocationSchema("s1", "comment", 
ImmutableMap.of(), illegalLocations));
+      Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
+
+      // empty location name in storage location
+      exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  createMultiLocationFileset(
+                      "fileset_test",
+                      "s1",
+                      null,
+                      Fileset.Type.MANAGED,
+                      ImmutableMap.of(),
+                      ImmutableMap.of("", TEST_ROOT_PATH + "/fileset31"),
+                      null));
+      Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
+
+      // storage location is parent of schema location
+      Schema multipLocationSchema =
+          createMultiLocationSchema(
+              "s1",
+              "comment",
+              ImmutableMap.of(),
+              ImmutableMap.of(
+                  PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1", TEST_ROOT_PATH + 
"/s1/a/b/c"));
+      exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  createMultiLocationFileset(
+                      "fileset_test",
+                      multipLocationSchema.name(),
+                      null,
+                      Fileset.Type.MANAGED,
+                      ImmutableMap.of(),
+                      ImmutableMap.of(LOCATION_NAME_UNKNOWN, TEST_ROOT_PATH + 
"/s1/a"),
+                      null));
+      Assertions.assertTrue(
+          exception
+              .getMessage()
+              .contains(
+                  "Default location name must be set and must be one of the 
fileset locations"),
+          "Exception message: " + exception.getMessage());
+    }
+  }
+
+  private static Stream<Arguments> multipleLocationsArguments() {
+    return Stream.of(
+        // Honor the catalog location
+        Arguments.of(
+            "fileset51",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/catalog31_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog31_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog31_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/catalog31_1/s1_fileset51/fileset51",
+                "v1",
+                TEST_ROOT_PATH + "/catalog31_1/s1_fileset51/fileset51",
+                "v2",
+                TEST_ROOT_PATH + "/catalog31_2/s1_fileset51/fileset51")),
+        Arguments.of(
+            // honor the schema location
+            "fileset52",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/s1_fileset52_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/s1_fileset52_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset52_2/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v2"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/s1_fileset52_1/fileset52",
+                "v1",
+                TEST_ROOT_PATH + "/s1_fileset52_1/fileset52",
+                "v2",
+                TEST_ROOT_PATH + "/s1_fileset52_2/fileset52")),
+        Arguments.of(
+            // honor the schema location
+            "fileset53",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/catalog32_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog32_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog32_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/s1_fileset53_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/s1_fileset53_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset53_2/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "unknown"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/s1_fileset53_1/fileset53",
+                "v1",
+                TEST_ROOT_PATH + "/s1_fileset53_1/fileset53",
+                "v2",
+                TEST_ROOT_PATH + "/s1_fileset53_2/fileset53")),
+        Arguments.of(
+            // honor the storage location
+            "fileset54",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/catalog33_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog33_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog33_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/s1_fileset54_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/s1_fileset54_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset54_2/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset54_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset54_1",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset54_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset54_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset54_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset55",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset55_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset55_1",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset55_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset55_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset55_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset56",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/catalog34_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog34_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset56_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset56_1",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset56_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset56_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset56_2")),
+        Arguments.of(
+            // honor partial catalog/schema/fileset locations
+            "fileset510",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog34_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog34_2",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v3",
+                TEST_ROOT_PATH + "/catalog34_3"),
+            ImmutableMap.of(
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset510_2",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v3",
+                TEST_ROOT_PATH + "/s1_{{fileset}}_3",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v4",
+                TEST_ROOT_PATH + "/s1_fileset510_4"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset510",
+                "v3",
+                TEST_ROOT_PATH + "/fileset510_3",
+                "v4",
+                TEST_ROOT_PATH + "/fileset510_4",
+                "v5",
+                TEST_ROOT_PATH + "/{{fileset}}_5"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v5"),
+            ImmutableMap.<String, String>builder()
+                .put(LOCATION_NAME_UNKNOWN, TEST_ROOT_PATH + "/fileset510")
+                .put("v1", TEST_ROOT_PATH + 
"/catalog34_1/s1_fileset510/fileset510")
+                .put("v2", TEST_ROOT_PATH + "/s1_fileset510_2/fileset510")
+                .put("v3", TEST_ROOT_PATH + "/fileset510_3")
+                .put("v4", TEST_ROOT_PATH + "/fileset510_4")
+                .put("v5", TEST_ROOT_PATH + "/fileset510_5")
+                .build()),
+        Arguments.of(
+            // test without unnamed storage location
+            "fileset511",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog34_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                "v1", TEST_ROOT_PATH + "/fileset511_1", "v2", TEST_ROOT_PATH + 
"/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                "v1", TEST_ROOT_PATH + "/fileset511_1", "v2", TEST_ROOT_PATH + 
"/fileset511_2")),
+        Arguments.of(
+            // test single location without unnamed storage location
+            "fileset512",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog34_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of("v1", TEST_ROOT_PATH + "/{{fileset}}_2"),
+            null,
+            ImmutableMap.of("v1", TEST_ROOT_PATH + "/fileset512_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset57",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/catalog35_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/catalog35_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/catalog35_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/s1_fileset57_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/s1_fileset57_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset57_2/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset57_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset57_1",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset57_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset57_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset57_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset58",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION,
+                TEST_ROOT_PATH + "/s1_fileset58_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                TEST_ROOT_PATH + "/s1_fileset58_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                TEST_ROOT_PATH + "/s1_fileset58_2/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset58_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset58_1",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset58_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset58_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset58_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset59",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset59",
+                "v1",
+                TEST_ROOT_PATH + "/fileset59",
+                "v2",
+                TEST_ROOT_PATH + "/{{fileset}}"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset59",
+                "v1",
+                TEST_ROOT_PATH + "/fileset59",
+                "v2",
+                TEST_ROOT_PATH + "/fileset59")),
+        // Honor the catalog location
+        Arguments.of(
+            "fileset501",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog301_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog301_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + 
"/catalog301_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/catalog301_1/s1_fileset501/fileset501",
+                "v1",
+                TEST_ROOT_PATH + "/catalog301_1/s1_fileset501/fileset501",
+                "v2",
+                TEST_ROOT_PATH + "/catalog301_2/s1_fileset501/fileset501")),
+        Arguments.of(
+            // honor the schema location
+            "fileset502",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset502_2/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/s1_fileset502_1/fileset502",
+                "v1",
+                TEST_ROOT_PATH + "/s1_fileset502_1/fileset502",
+                "v2",
+                TEST_ROOT_PATH + "/s1_fileset502_2/fileset502")),
+        Arguments.of(
+            // honor the schema location
+            "fileset503",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog302_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog302_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + 
"/catalog302_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset503_2/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/s1_fileset503_1/fileset503",
+                "v1",
+                TEST_ROOT_PATH + "/s1_fileset503_1/fileset503",
+                "v2",
+                TEST_ROOT_PATH + "/s1_fileset503_2/fileset503")),
+        Arguments.of(
+            // honor the storage location
+            "fileset504",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog303_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog303_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + 
"/catalog303_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset504_2/{{fileset}}"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset504_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset504_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset504_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset504_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset504_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset505",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset505_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset505_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset505_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset505_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset505_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset506",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog304_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog304_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + 
"/catalog304_2/{{schema}}/{{fileset}}"),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset506_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset506_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset506_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset506_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset506_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset507",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/catalog305_2"),
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset507_2"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset507_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset507_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset507_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset507_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset507_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset508",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION,
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_1",
+                PROPERTY_MULTIPLE_LOCATIONS_PREFIX + "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/s1_fileset508_2"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset508_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset508_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset508_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset508_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset508_2")),
+        Arguments.of(
+            // honor the storage location
+            "fileset509",
+            Fileset.Type.EXTERNAL,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset509_1",
+                "v1",
+                UNFORMALIZED_TEST_ROOT_PATH + "/fileset509_1",
+                "v2",
+                UNFORMALIZED_TEST_ROOT_PATH + "/{{fileset}}_2"),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "v1"),
+            ImmutableMap.of(
+                LOCATION_NAME_UNKNOWN,
+                TEST_ROOT_PATH + "/fileset509_1",
+                "v1",
+                TEST_ROOT_PATH + "/fileset509_1",
+                "v2",
+                TEST_ROOT_PATH + "/fileset509_2")));
+  }
+
   private static Stream<Arguments> locationWithPlaceholdersArguments() {
     return Stream.of(
         // placeholders in catalog location
@@ -1687,7 +2380,7 @@ public class TestHadoopCatalogOperations {
       throws IOException {
     Map<String, String> props = Maps.newHashMap();
     if (catalogPath != null) {
-      props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      props.put(LOCATION, catalogPath);
     }
 
     try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
@@ -1706,6 +2399,47 @@ public class TestHadoopCatalogOperations {
     }
   }
 
+  private Schema createMultiLocationSchema(
+      String name,
+      String comment,
+      Map<String, String> catalogPaths,
+      Map<String, String> schemaPaths)
+      throws IOException {
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+
+      NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema("m1", "c1", 
name);
+      Map<String, String> schemaProps = Maps.newHashMap();
+      StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
+      schemaProps = 
Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, schemaProps));
+      schemaProps.putAll(schemaPaths);
+
+      return ops.createSchema(schemaIdent, comment, schemaProps);
+    }
+  }
+
+  private Fileset createMultiLocationFileset(
+      String name,
+      String schemaName,
+      String comment,
+      Fileset.Type type,
+      Map<String, String> catalogPaths,
+      Map<String, String> storageLocations,
+      Map<String, String> filesetProps)
+      throws IOException {
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogPaths, randomCatalogInfo("m1", "c1"), 
HADOOP_PROPERTIES_METADATA);
+
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
name);
+      StringIdentifier stringId = 
StringIdentifier.fromId(idGenerator.nextId());
+      Map<String, String> newFilesetProps =
+          Maps.newHashMap(StringIdentifier.newPropertiesWithId(stringId, 
filesetProps));
+
+      return ops.createMultipleLocationFileset(
+          filesetIdent, comment, type, storageLocations, newFilesetProps);
+    }
+  }
+
   private Fileset createFileset(
       String name,
       String schemaName,
@@ -1716,7 +2450,7 @@ public class TestHadoopCatalogOperations {
       throws IOException {
     Map<String, String> props = Maps.newHashMap();
     if (catalogPath != null) {
-      props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      props.put(LOCATION, catalogPath);
     }
 
     try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
@@ -1742,7 +2476,7 @@ public class TestHadoopCatalogOperations {
       throws IOException {
     Map<String, String> props = Maps.newHashMap();
     if (catalogPath != null) {
-      props.put(HadoopCatalogPropertiesMetadata.LOCATION, catalogPath);
+      props.put(LOCATION, catalogPath);
     }
 
     try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 1f358813d9..8a45b5c8d5 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.catalog.hadoop.integration.test;
 
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static 
org.apache.gravitino.file.Fileset.PROPERTY_LOCATION_PLACEHOLDER_PREFIX;
 import static org.apache.gravitino.file.Fileset.Type.MANAGED;
 
@@ -202,8 +204,10 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("comment", fileset.comment());
     Assertions.assertEquals(MANAGED, fileset.type());
     Assertions.assertEquals(storageLocation, fileset.storageLocation());
-    Assertions.assertEquals(1, fileset.properties().size());
+    Assertions.assertEquals(2, fileset.properties().size());
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
 
     // test create a fileset that already exist
     Assertions.assertThrows(
@@ -224,7 +228,9 @@ public class HadoopCatalogIT extends BaseIT {
         storageLocation(filesetName2),
         fileset2.storageLocation(),
         "storage location should be created");
-    Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(), 
"properties should be empty");
+    Assertions.assertEquals(
+        ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, LOCATION_NAME_UNKNOWN),
+        fileset2.properties());
 
     // create fileset with placeholder in storage location
     String filesetName4 = "test_create_fileset_with_placeholder";
@@ -239,7 +245,9 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("comment", fileset4.comment());
     Assertions.assertEquals(MANAGED, fileset4.type());
     Assertions.assertEquals(expectedStorageLocation4, 
fileset4.storageLocation());
-    Assertions.assertEquals(0, fileset4.properties().size(), "properties 
should be empty");
+    Assertions.assertEquals(1, fileset4.properties().size(), "properties 
should be empty");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
fileset4.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
 
     // create fileset with null fileset name
     Assertions.assertThrows(
@@ -303,10 +311,12 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("这是中文comment", fileset.comment());
     Assertions.assertEquals(MANAGED, fileset.type());
     Assertions.assertEquals(storageLocation, fileset.storageLocation());
-    Assertions.assertEquals(3, fileset.properties().size());
+    Assertions.assertEquals(4, fileset.properties().size());
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
     Assertions.assertEquals("中文测试test", fileset.properties().get("test"));
     Assertions.assertEquals("test1", fileset.properties().get("中文key"));
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
@@ -328,10 +338,12 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("comment", fileset.comment());
     Assertions.assertEquals(Fileset.Type.EXTERNAL, fileset.type());
     Assertions.assertEquals(storageLocation, fileset.storageLocation());
-    Assertions.assertEquals(1, fileset.properties().size());
+    Assertions.assertEquals(2, fileset.properties().size());
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
     Assertions.assertTrue(
         fileSystem.exists(new Path(storageLocation)), "storage location should 
be created");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
 
     // create fileset with storage location that not exist
     String filesetName2 = "test_external_fileset_no_exist";
@@ -538,9 +550,11 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be 
change");
     Assertions.assertEquals(
         storageLocation, newFileset.storageLocation(), "storage location 
should not be change");
-    Assertions.assertEquals(1, newFileset.properties().size(), "properties 
should not be change");
+    Assertions.assertEquals(2, newFileset.properties().size(), "properties 
should not be change");
     Assertions.assertEquals(
         "v1", newFileset.properties().get("k1"), "properties should not be 
change");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
@@ -568,9 +582,11 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be 
change");
     Assertions.assertEquals(
         storageLocation, newFileset.storageLocation(), "storage location 
should not be change");
-    Assertions.assertEquals(1, newFileset.properties().size(), "properties 
should not be change");
+    Assertions.assertEquals(2, newFileset.properties().size(), "properties 
should not be change");
     Assertions.assertEquals(
         "v1", newFileset.properties().get("k1"), "properties should not be 
change");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
@@ -596,9 +612,11 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be 
change");
     Assertions.assertEquals(
         storageLocation, newFileset.storageLocation(), "storage location 
should not be change");
-    Assertions.assertEquals(1, newFileset.properties().size(), "properties 
should not be change");
+    Assertions.assertEquals(2, newFileset.properties().size(), "properties 
should not be change");
     Assertions.assertEquals(
         "v2", newFileset.properties().get("k1"), "properties should be 
updated");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
@@ -624,7 +642,9 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be 
change");
     Assertions.assertEquals(
         storageLocation, newFileset.storageLocation(), "storage location 
should not be change");
-    Assertions.assertEquals(0, newFileset.properties().size(), "properties 
should be removed");
+    Assertions.assertEquals(1, newFileset.properties().size(), "properties 
should be removed");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
@@ -650,9 +670,11 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(MANAGED, newFileset.type(), "type should not be 
changed");
     Assertions.assertEquals(
         storageLocation, newFileset.storageLocation(), "storage location 
should not be changed");
-    Assertions.assertEquals(1, newFileset.properties().size(), "properties 
should not be changed");
+    Assertions.assertEquals(2, newFileset.properties().size(), "properties 
should not be changed");
     Assertions.assertEquals(
         "v1", newFileset.properties().get("k1"), "properties should not be 
changed");
+    Assertions.assertEquals(
+        LOCATION_NAME_UNKNOWN, 
newFileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
   }
 
   @Test
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
index e967a1c6af..5da898682a 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java
@@ -367,7 +367,10 @@ public class HadoopUserImpersonationIT extends BaseIT {
     Assertions.assertEquals("comment", fileset.comment());
     Assertions.assertEquals(Fileset.Type.MANAGED, fileset.type());
     Assertions.assertEquals(storageLocation, fileset.storageLocation());
-    Assertions.assertEquals(1, fileset.properties().size());
+    Assertions.assertEquals(2, fileset.properties().size());
+    Assertions.assertTrue(
+        
fileset.properties().containsKey(Fileset.PROPERTY_DEFAULT_LOCATION_NAME),
+        "properties should contain default location name");
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
 
     // test create a fileset that already exist
@@ -394,7 +397,9 @@ public class HadoopUserImpersonationIT extends BaseIT {
         storageLocation(filesetName2),
         fileset2.storageLocation(),
         "storage location should be created");
-    Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(), 
"properties should be empty");
+    Assertions.assertTrue(
+        
fileset2.properties().containsKey(Fileset.PROPERTY_DEFAULT_LOCATION_NAME),
+        "properties should contain default location name");
 
     // create fileset with null fileset name
     Assertions.assertThrows(
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py 
b/clients/client-python/tests/integration/test_fileset_catalog.py
index ac3d0a8216..98a31fa0a2 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -180,7 +180,14 @@ class TestFilesetCatalog(IntegrationTestEnv):
         self.assertIsNotNone(fileset)
         self.assertEqual(fileset.type(), Fileset.Type.MANAGED)
         self.assertEqual(fileset.comment(), self.fileset_comment)
-        self.assertEqual(fileset.properties(), self.fileset_properties)
+        self.assertEqual(
+            fileset.properties(),
+            {
+                Fileset.PROPERTY_DEFAULT_LOCATION_NAME: 
Fileset.LOCATION_NAME_UNKNOWN,
+                **self.fileset_properties,
+            },
+        )
+        self.assertEqual(fileset.storage_location(), 
f"file:{self.fileset_location}")
 
     def test_drop_fileset(self):
         self.create_fileset()
@@ -207,7 +214,13 @@ class TestFilesetCatalog(IntegrationTestEnv):
         self.assertIsNotNone(fileset)
         self.assertEqual(fileset.name(), self.fileset_name)
         self.assertEqual(fileset.comment(), self.fileset_comment)
-        self.assertEqual(fileset.properties(), self.fileset_properties)
+        self.assertEqual(
+            fileset.properties(),
+            {
+                Fileset.PROPERTY_DEFAULT_LOCATION_NAME: 
Fileset.LOCATION_NAME_UNKNOWN,
+                **self.fileset_properties,
+            },
+        )
         self.assertEqual(fileset.audit_info().creator(), "anonymous")
 
     def test_failed_load_fileset(self):
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
index d9521ee0b6..0964a0781f 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
@@ -40,14 +40,6 @@ public final class EntityCombinedFileset implements Fileset {
     this.filesetEntity = filesetEntity;
   }
 
-  public FilesetEntity filesetEntity() {
-    return filesetEntity;
-  }
-
-  public Fileset fileset() {
-    return fileset;
-  }
-
   public static EntityCombinedFileset of(Fileset fileset, FilesetEntity 
filesetEntity) {
     return new EntityCombinedFileset(fileset, filesetEntity);
   }
@@ -77,8 +69,8 @@ public final class EntityCombinedFileset implements Fileset {
   }
 
   @Override
-  public String storageLocation() {
-    return fileset.storageLocation();
+  public Map<String, String> storageLocations() {
+    return fileset.storageLocations();
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index e1c20de702..9a14004050 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -29,6 +29,7 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
@@ -66,15 +67,15 @@ public class FilesetNormalizeDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
-    return dispatcher.createFileset(
-        normalizeNameIdentifier(ident), comment, type, storageLocation, 
properties);
+    return dispatcher.createMultipleLocationFileset(
+        normalizeNameIdentifier(ident), comment, type, storageLocations, 
properties);
   }
 
   @Override
@@ -95,10 +96,11 @@ public class FilesetNormalizeDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath) {
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
     // and for compatibility reasons, we only apply case-sensitive 
capabilities here.
-    return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath);
+    return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath, 
locationName);
   }
 
   private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index ff1e8b6adc..5a2eb6a997 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -30,6 +30,7 @@ import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptyEntityException;
 import org.apache.gravitino.file.Fileset;
@@ -101,26 +102,27 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
   /**
    * Create a fileset metadata in the catalog.
    *
-   * <p>If the type of the fileset object is "MANAGED", the underlying 
storageLocation can be null,
-   * and Gravitino will manage the storage location based on the location of 
the schema.
+   * <p>If the type of the fileset object is "MANAGED", the underlying 
storageLocations can be
+   * empty, and Gravitino will manage the storage location based on the 
locations of the schema.
    *
-   * <p>If the type of the fileset object is "EXTERNAL", the underlying 
storageLocation must be set.
+   * <p>If the type of the fileset object is "EXTERNAL", the underlying 
storageLocations must be
+   * set.
    *
    * @param ident A fileset identifier.
    * @param comment The comment of the fileset.
    * @param type The type of the fileset.
-   * @param storageLocation The storage location of the fileset.
+   * @param storageLocations The location names and corresponding storage 
locations of the fileset.
    * @param properties The properties of the fileset.
    * @return The created fileset metadata
    * @throws NoSuchSchemaException If the schema does not exist.
    * @throws FilesetAlreadyExistsException If the fileset already exists.
    */
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     NameIdentifier catalogIdent = getCatalogIdentifier(ident);
@@ -148,8 +150,8 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
                     c ->
                         c.doWithFilesetOps(
                             f ->
-                                f.createFileset(
-                                    ident, comment, type, storageLocation, 
updatedProperties)),
+                                f.createMultipleLocationFileset(
+                                    ident, comment, type, storageLocations, 
updatedProperties)),
                     NoSuchSchemaException.class,
                     FilesetAlreadyExistsException.class));
     return EntityCombinedFileset.of(createdFileset)
@@ -228,15 +230,17 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
 
   /**
    * Get the actual location of a file or directory based on the storage 
location of Fileset and the
-   * sub path.
+   * sub path by the location name.
    *
    * @param ident A fileset identifier.
    * @param subPath The sub path to the file or directory.
+   * @param locationName The location name.
    * @return The actual location of the file or directory.
    * @throws NoSuchFilesetException If the fileset does not exist.
+   * @throws NoSuchLocationNameException If the location name does not exist.
    */
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
       throws NoSuchFilesetException {
     return TreeLockUtils.doWithTreeLock(
         ident,
@@ -244,7 +248,7 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
         () ->
             doWithCatalog(
                 getCatalogIdentifier(ident),
-                c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, 
subPath)),
+                c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath, 
locationName)),
                 NonEmptyEntityException.class));
   }
 }
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java 
b/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
index 7880ac4c2c..96ff2ce6e7 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseFileset.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.connector;
 
+import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.gravitino.annotation.Evolving;
@@ -38,7 +39,7 @@ public abstract class BaseFileset implements Fileset {
 
   protected Type type;
 
-  protected String storageLocation;
+  protected Map<String, String> storageLocations;
 
   @Nullable protected Map<String, String> properties;
 
@@ -63,10 +64,9 @@ public abstract class BaseFileset implements Fileset {
     return type;
   }
 
-  /** @return The storage location string of the fileset. */
-  @Override
-  public String storageLocation() {
-    return storageLocation;
+  /** @return The storage locations of the fileset. */
+  public Map<String, String> storageLocations() {
+    return storageLocations;
   }
 
   /** @return The audit information for the fileset. */
@@ -90,7 +90,7 @@ public abstract class BaseFileset implements Fileset {
 
     SELF withType(Type type);
 
-    SELF withStorageLocation(String storageLocation);
+    SELF withStorageLocations(Map<String, String> storageLocations);
 
     SELF withProperties(Map<String, String> properties);
 
@@ -113,6 +113,7 @@ public abstract class BaseFileset implements Fileset {
     protected String comment;
     protected Type type;
     protected String storageLocation;
+    protected Map<String, String> storageLocations = new HashMap<>();
     protected Map<String, String> properties;
     protected AuditInfo auditInfo;
 
@@ -153,14 +154,14 @@ public abstract class BaseFileset implements Fileset {
     }
 
     /**
-     * Sets the storage location of the fileset.
+     * Sets the storage locations of the fileset.
      *
-     * @param storageLocation The storage location of the fileset.
+     * @param storageLocations The storage locations of the fileset.
      * @return The builder instance.
      */
     @Override
-    public SELF withStorageLocation(String storageLocation) {
-      this.storageLocation = storageLocation;
+    public SELF withStorageLocations(Map<String, String> storageLocations) {
+      this.storageLocations.putAll(storageLocations);
       return self();
     }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index a1e19f9cfa..a6acbff441 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -30,6 +30,7 @@ import org.apache.gravitino.authorization.OwnerManager;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
@@ -59,18 +60,20 @@ public class FilesetHookDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     // Check whether the current user exists or not
     AuthorizationUtils.checkCurrentUser(
         ident.namespace().level(0), PrincipalUtils.getCurrentUserName());
 
-    Fileset fileset = dispatcher.createFileset(ident, comment, type, 
storageLocation, properties);
+    Fileset fileset =
+        dispatcher.createMultipleLocationFileset(
+            ident, comment, type, storageLocations, properties);
 
     // Set the creator as the owner of the fileset.
     OwnerManager ownerManager = GravitinoEnv.getInstance().ownerManager();
@@ -118,8 +121,8 @@ public class FilesetHookDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
-      throws NoSuchFilesetException {
-    return dispatcher.getFileLocation(ident, subPath);
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
+    return dispatcher.getFileLocation(ident, subPath, locationName);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java 
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
index 79274d57b2..cb1c2ad1d2 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.catalog.FilesetDispatcher;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
@@ -97,19 +98,21 @@ public class FilesetEventDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     FilesetInfo createFileRequest =
-        new FilesetInfo(ident.name(), comment, type, storageLocation, 
properties, null);
+        new FilesetInfo(ident.name(), comment, type, storageLocations, 
properties, null);
     eventBus.dispatchEvent(
         new CreateFilesetPreEvent(PrincipalUtils.getCurrentUserName(), ident, 
createFileRequest));
     try {
-      Fileset fileset = dispatcher.createFileset(ident, comment, type, 
storageLocation, properties);
+      Fileset fileset =
+          dispatcher.createMultipleLocationFileset(
+              ident, comment, type, storageLocations, properties);
       eventBus.dispatchEvent(
           new CreateFilesetEvent(
               PrincipalUtils.getCurrentUserName(), ident, new 
FilesetInfo(fileset)));
@@ -120,7 +123,7 @@ public class FilesetEventDispatcher implements 
FilesetDispatcher {
               PrincipalUtils.getCurrentUserName(),
               ident,
               e,
-              new FilesetInfo(ident.name(), comment, type, storageLocation, 
properties, null)));
+              new FilesetInfo(ident.name(), comment, type, storageLocations, 
properties, null)));
       throw e;
     }
   }
@@ -159,12 +162,13 @@ public class FilesetEventDispatcher implements 
FilesetDispatcher {
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
-      throws NoSuchFilesetException {
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
     eventBus.dispatchEvent(
-        new GetFileLocationPreEvent(PrincipalUtils.getCurrentUserName(), 
ident, subPath));
+        new GetFileLocationPreEvent(
+            PrincipalUtils.getCurrentUserName(), ident, subPath, 
locationName));
     try {
-      String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
+      String actualFileLocation = dispatcher.getFileLocation(ident, subPath, 
locationName);
       // get the audit info from the thread local context
       ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
       CallerContext callerContext = CallerContext.CallerContextHolder.get();
@@ -177,11 +181,13 @@ public class FilesetEventDispatcher implements 
FilesetDispatcher {
               ident,
               actualFileLocation,
               subPath,
+              locationName,
               builder.build()));
       return actualFileLocation;
     } catch (Exception e) {
       eventBus.dispatchEvent(
-          new GetFileLocationFailureEvent(PrincipalUtils.getCurrentUserName(), 
ident, subPath, e));
+          new GetFileLocationFailureEvent(
+              PrincipalUtils.getCurrentUserName(), ident, subPath, 
locationName, e));
       throw e;
     }
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
index 54d34aa6ec..72aff8498f 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
@@ -29,6 +29,7 @@ public final class GetFileLocationEvent extends FilesetEvent {
   private final String actualFileLocation;
   private final String subPath;
   private final Map<String, String> context;
+  private final String locationName;
 
   /**
    * Constructs a new {@code GetFileLocationEvent}, recording the attempt to 
get a file location.
@@ -45,8 +46,29 @@ public final class GetFileLocationEvent extends FilesetEvent 
{
       String actualFileLocation,
       String subPath,
       Map<String, String> context) {
+    this(user, identifier, actualFileLocation, subPath, null, context);
+  }
+
+  /**
+   * Constructs a new {@code GetFileLocationEvent}, recording the attempt to 
get a file location.
+   *
+   * @param user The user who initiated the get file location.
+   * @param identifier The identifier of the file location that was attempted 
to be got.
+   * @param actualFileLocation The actual file location which want to get.
+   * @param subPath The accessing sub path of the get file location operation.
+   * @param locationName The location name of the file location.
+   * @param context The audit context, this param can be null.
+   */
+  public GetFileLocationEvent(
+      String user,
+      NameIdentifier identifier,
+      String actualFileLocation,
+      String subPath,
+      String locationName,
+      Map<String, String> context) {
     super(user, identifier);
     this.actualFileLocation = actualFileLocation;
+    this.locationName = locationName;
     this.subPath = subPath;
     this.context = context;
   }
@@ -78,6 +100,15 @@ public final class GetFileLocationEvent extends 
FilesetEvent {
     return context;
   }
 
+  /**
+   * Get the location name of the file location.
+   *
+   * @return The location name.
+   */
+  public String locationName() {
+    return locationName;
+  }
+
   /**
    * Returns the type of operation.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
index 7fd2b48666..a4b8e5db5e 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
@@ -29,6 +29,7 @@ import org.apache.gravitino.annotation.DeveloperApi;
 @DeveloperApi
 public final class GetFileLocationFailureEvent extends FilesetFailureEvent {
   private final String subPath;
+  private final String locationName;
 
   /**
    * Constructs a new {@code GetFileLocationFailureEvent}.
@@ -42,8 +43,29 @@ public final class GetFileLocationFailureEvent extends 
FilesetFailureEvent {
    */
   public GetFileLocationFailureEvent(
       String user, NameIdentifier identifier, String subPath, Exception 
exception) {
+    this(user, identifier, subPath, null, exception);
+  }
+
+  /**
+   * Constructs a new {@code GetFileLocationFailureEvent}.
+   *
+   * @param user The user who initiated the get a file location.
+   * @param identifier The identifier of the file location that was attempted 
to be got.
+   * @param subPath The sub path of the actual file location which want to get.
+   * @param locationName The name of the location.
+   * @param exception The exception that was thrown during the get a file 
location. This exception
+   *     is key to diagnosing the failure, providing insights into what went 
wrong during the
+   *     operation.
+   */
+  public GetFileLocationFailureEvent(
+      String user,
+      NameIdentifier identifier,
+      String subPath,
+      String locationName,
+      Exception exception) {
     super(user, identifier, exception);
     this.subPath = subPath;
+    this.locationName = locationName;
   }
 
   /**
@@ -55,6 +77,15 @@ public final class GetFileLocationFailureEvent extends 
FilesetFailureEvent {
     return subPath;
   }
 
+  /**
+   * Get the audit context map of the get file location operation.
+   *
+   * @return The audit context map.
+   */
+  public String locationName() {
+    return locationName;
+  }
+
   /**
    * Returns the type of operation.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
index 6250234ba3..b9d7454958 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationPreEvent.java
@@ -26,6 +26,7 @@ import org.apache.gravitino.annotation.DeveloperApi;
 @DeveloperApi
 public final class GetFileLocationPreEvent extends FilesetPreEvent {
   private final String subPath;
+  private final String locationName;
 
   /**
    * Constructs a new {@code GetFileLocationPreEvent}, recording the intent to 
get a file location.
@@ -35,8 +36,22 @@ public final class GetFileLocationPreEvent extends 
FilesetPreEvent {
    * @param subPath The accessing sub path of the get file location operation.
    */
   public GetFileLocationPreEvent(String user, NameIdentifier identifier, 
String subPath) {
+    this(user, identifier, subPath, null);
+  }
+
+  /**
+   * Constructs a new {@code GetFileLocationPreEvent}, recording the intent to 
get a file location.
+   *
+   * @param user The user who initiated the get file location operation.
+   * @param identifier The identifier of the file location to be accessed.
+   * @param subPath The accessing sub path of the get file location operation.
+   * @param locationName The name of the location to be accessed.
+   */
+  public GetFileLocationPreEvent(
+      String user, NameIdentifier identifier, String subPath, String 
locationName) {
     super(user, identifier);
     this.subPath = subPath;
+    this.locationName = locationName;
   }
 
   /**
@@ -48,6 +63,15 @@ public final class GetFileLocationPreEvent extends 
FilesetPreEvent {
     return subPath;
   }
 
+  /**
+   * Get the name of the location to be accessed.
+   *
+   * @return The name of the location.
+   */
+  public String locationName() {
+    return locationName;
+  }
+
   /**
    * Returns the type of operation.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java 
b/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
index 30c5c9566c..d6897bc7ec 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/info/FilesetInfo.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.listener.api.info;
 
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -32,7 +34,7 @@ public final class FilesetInfo {
   private final String name;
   @Nullable private final String comment;
   private final Fileset.Type type;
-  private final String storageLocation;
+  private final Map<String, String> storageLocations;
   private final Map<String, String> properties;
   @Nullable private final Audit audit;
 
@@ -46,7 +48,7 @@ public final class FilesetInfo {
         fileset.name(),
         fileset.comment(),
         fileset.type(),
-        fileset.storageLocation(),
+        fileset.storageLocations(),
         fileset.properties(),
         fileset.auditInfo());
   }
@@ -71,7 +73,36 @@ public final class FilesetInfo {
     this.name = name;
     this.comment = comment;
     this.type = type;
-    this.storageLocation = storageLocation;
+    this.storageLocations =
+        storageLocation == null
+            ? ImmutableMap.of()
+            : ImmutableMap.of(LOCATION_NAME_UNKNOWN, storageLocation);
+    this.properties = properties == null ? ImmutableMap.of() : 
ImmutableMap.copyOf(properties);
+    this.audit = audit;
+  }
+
+  /**
+   * Constructs a FilesetInfo object with specified details.
+   *
+   * @param name The name of the fileset.
+   * @param comment An optional comment about the fileset. Can be {@code null}.
+   * @param type The type of the fileset.
+   * @param storageLocations The storage locations of the fileset.
+   * @param properties A map of properties associated with the fileset. Can be 
{@code null}.
+   * @param audit Optional audit information. Can be {@code null}.
+   */
+  public FilesetInfo(
+      String name,
+      String comment,
+      Fileset.Type type,
+      Map<String, String> storageLocations,
+      Map<String, String> properties,
+      Audit audit) {
+    this.name = name;
+    this.comment = comment;
+    this.type = type;
+    this.storageLocations =
+        storageLocations == null ? ImmutableMap.of() : 
ImmutableMap.copyOf(storageLocations);
     this.properties = properties == null ? ImmutableMap.of() : 
ImmutableMap.copyOf(properties);
     this.audit = audit;
   }
@@ -110,7 +141,12 @@ public final class FilesetInfo {
    * @return The storage location.
    */
   public String storageLocation() {
-    return storageLocation;
+    return storageLocations.get(LOCATION_NAME_UNKNOWN);
+  }
+
+  /** @return The storage locations of the fileset. */
+  public Map<String, String> storageLocations() {
+    return storageLocations;
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index 447f3405c6..7a3620d41c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -81,6 +81,7 @@ public class SchemaMetaService {
 
   public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
       String metalakeName, String catalogName, String schemaName) {
+    System.out.println("test");
     return SessionUtils.getWithoutCommit(
         SchemaMetaMapper.class,
         mapper ->
diff --git a/core/src/test/java/org/apache/gravitino/TestFileset.java 
b/core/src/test/java/org/apache/gravitino/TestFileset.java
index b7129a7e76..7c65ee7b52 100644
--- a/core/src/test/java/org/apache/gravitino/TestFileset.java
+++ b/core/src/test/java/org/apache/gravitino/TestFileset.java
@@ -36,7 +36,7 @@ public class TestFileset extends BaseFileset {
       fileset.properties = properties;
       fileset.auditInfo = auditInfo;
       fileset.type = type;
-      fileset.storageLocation = storageLocation;
+      fileset.storageLocations = storageLocations;
       return fileset;
     }
   }
diff --git 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index 17d3f5c217..4e0d710871 100644
--- 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++ 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.connector;
 
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -29,6 +31,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -56,6 +59,7 @@ import 
org.apache.gravitino.exceptions.ModelAlreadyExistsException;
 import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchModelException;
 import org.apache.gravitino.exceptions.NoSuchModelVersionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -400,15 +404,32 @@ public class TestCatalogOperations
   }
 
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    if (storageLocations != null && storageLocations.size() == 1) {
+      properties =
+          Optional.ofNullable(properties)
+              .map(
+                  props ->
+                      ImmutableMap.<String, String>builder()
+                          .putAll(props)
+                          .put(
+                              PROPERTY_DEFAULT_LOCATION_NAME,
+                              storageLocations.keySet().iterator().next())
+                          .build())
+              .orElseGet(
+                  () ->
+                      ImmutableMap.of(
+                          PROPERTY_DEFAULT_LOCATION_NAME,
+                          storageLocations.keySet().iterator().next()));
+    }
     TestFileset fileset =
         TestFileset.builder()
             .withName(ident.name())
@@ -416,7 +437,7 @@ public class TestCatalogOperations
             .withProperties(properties)
             .withAuditInfo(auditInfo)
             .withType(type)
-            .withStorageLocation(storageLocation)
+            .withStorageLocations(storageLocations)
             .build();
 
     NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
@@ -482,7 +503,7 @@ public class TestCatalogOperations
             .withProperties(newProps)
             .withAuditInfo(updatedAuditInfo)
             .withType(fileset.type())
-            .withStorageLocation(fileset.storageLocation())
+            .withStorageLocations(fileset.storageLocations())
             .build();
     filesets.put(newIdent, updatedFileset);
     return updatedFileset;
@@ -499,7 +520,7 @@ public class TestCatalogOperations
   }
 
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath) {
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName) {
     Preconditions.checkArgument(subPath != null, "subPath must not be null");
     String processedSubPath;
     if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
@@ -509,8 +530,16 @@ public class TestCatalogOperations
     }
 
     Fileset fileset = loadFileset(ident);
+    Map<String, String> storageLocations = fileset.storageLocations();
+    String targetLocationName =
+        Optional.ofNullable(locationName)
+            .orElse(fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+    if (storageLocations == null || 
!storageLocations.containsKey(targetLocationName)) {
+      throw new NoSuchLocationNameException(
+          "The location name: %s does not exist in the fileset: %s", 
targetLocationName, ident);
+    }
 
-    boolean isSingleFile = checkSingleFile(fileset);
+    boolean isSingleFile = 
checkSingleFile(storageLocations.get(targetLocationName));
     // if the storage location is a single file, it cannot have sub path to 
access.
     if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
       throw new GravitinoRuntimeException(
@@ -953,9 +982,9 @@ public class TestCatalogOperations
         && !CallerContext.CallerContextHolder.get().context().isEmpty();
   }
 
-  private boolean checkSingleFile(Fileset fileset) {
+  private boolean checkSingleFile(String location) {
     try {
-      File locationPath = new File(fileset.storageLocation());
+      File locationPath = new File(location);
       return locationPath.isFile();
     } catch (Exception e) {
       return false;
diff --git 
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
 
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
index 803b38522d..1f0795652b 100644
--- 
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
+++ 
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.listener.api.event;
 
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -335,17 +336,18 @@ public class TestFilesetEvent {
     when(fileset.properties()).thenReturn(ImmutableMap.of("a", "b"));
     when(fileset.name()).thenReturn("fileset");
     when(fileset.auditInfo()).thenReturn(null);
-    when(fileset.storageLocation()).thenReturn("location");
+    when(fileset.storageLocation()).thenCallRealMethod();
+    
when(fileset.storageLocations()).thenReturn(ImmutableMap.of(LOCATION_NAME_UNKNOWN,
 "location"));
     return fileset;
   }
 
   private FilesetDispatcher mockFilesetDispatcher() {
     FilesetDispatcher dispatcher = mock(FilesetDispatcher.class);
-    when(dispatcher.createFileset(
+    when(dispatcher.createMultipleLocationFileset(
             any(NameIdentifier.class),
             any(String.class),
             any(Fileset.Type.class),
-            any(String.class),
+            any(Map.class),
             any(Map.class)))
         .thenReturn(fileset);
     
when(dispatcher.loadFileset(any(NameIdentifier.class))).thenReturn(fileset);
diff --git 
a/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java 
b/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
index a24b810bf7..4c8f1f2325 100644
--- a/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
+++ b/core/src/test/java/org/apache/gravitino/meta/TestEntityCombinedObject.java
@@ -114,7 +114,7 @@ public class TestEntityCombinedObject {
         
EntityCombinedFileset.of(originFileset).withHiddenProperties(hiddenProperties);
     Assertions.assertEquals(originFileset.name(), 
entityCombinedFileset.name());
     Assertions.assertEquals(originFileset.comment(), 
entityCombinedFileset.comment());
-    Map<String, String> filterProp = new HashMap<>(originTopic.properties());
+    Map<String, String> filterProp = new HashMap<>(originFileset.properties());
     filterProp.remove("k3");
     filterProp.remove(null);
     filterProp.remove("k5");

Reply via email to