This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 2337c2ed1c [#6894] feat(client): Java/Python clients supports fileset 
multiple locations (#6943)
2337c2ed1c is described below

commit 2337c2ed1c15ec41874a2e6b5fcb19aa210bca16
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 15 18:48:51 2025 +0800

    [#6894] feat(client): Java/Python clients supports fileset multiple 
locations (#6943)
    
    ### What changes were proposed in this pull request?
    
    Java/Python clients supports fileset multiple locations
    
    ### Why are the changes needed?
    
    Fix: #6894
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, Java/Python clients support fileset multiple locations
    
    ### How was this patch tested?
    
    tests added
    
    Co-authored-by: mchades <[email protected]>
---
 .../hadoop/integration/test/HadoopCatalogIT.java   | 145 +++++++++++++++++++++
 .../org/apache/gravitino/client/ErrorHandlers.java |  11 +-
 .../apache/gravitino/client/FilesetCatalog.java    |  27 ++--
 .../apache/gravitino/client/GenericFileset.java    |   4 +-
 .../gravitino/client/TestFilesetCatalog.java       |  49 ++++++-
 .../gravitino/client/fileset_catalog.py            |  59 +++++----
 .../gravitino/client/generic_fileset.py            |   6 +-
 clients/client-python/gravitino/dto/fileset_dto.py |  10 +-
 .../dto/requests/fileset_create_request.py         |   8 +-
 .../gravitino/dto/responses/fileset_response.py    |   8 +-
 .../exceptions/handlers/fileset_error_handler.py   |   3 +
 .../tests/integration/test_fileset_catalog.py      | 100 +++++++++++++-
 clients/client-python/tests/unittests/mock_base.py |   7 +-
 .../gravitino/dto/responses/FilesetResponse.java   |   9 +-
 14 files changed, 376 insertions(+), 70 deletions(-)

diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 8a45b5c8d5..ee77d11e51 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -204,6 +204,7 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("comment", fileset.comment());
     Assertions.assertEquals(MANAGED, fileset.type());
     Assertions.assertEquals(storageLocation, fileset.storageLocation());
+    Assertions.assertEquals(storageLocation, 
fileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
     Assertions.assertEquals(2, fileset.properties().size());
     Assertions.assertEquals("v1", fileset.properties().get("k1"));
     Assertions.assertEquals(
@@ -228,6 +229,10 @@ public class HadoopCatalogIT extends BaseIT {
         storageLocation(filesetName2),
         fileset2.storageLocation(),
         "storage location should be created");
+    Assertions.assertEquals(
+        storageLocation(filesetName2),
+        fileset2.storageLocations().get(LOCATION_NAME_UNKNOWN),
+        "storage location should be created");
     Assertions.assertEquals(
         ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, LOCATION_NAME_UNKNOWN),
         fileset2.properties());
@@ -245,10 +250,61 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals("comment", fileset4.comment());
     Assertions.assertEquals(MANAGED, fileset4.type());
     Assertions.assertEquals(expectedStorageLocation4, 
fileset4.storageLocation());
+    Assertions.assertEquals(
+        expectedStorageLocation4, 
fileset4.storageLocations().get(LOCATION_NAME_UNKNOWN));
     Assertions.assertEquals(1, fileset4.properties().size(), "properties 
should be empty");
     Assertions.assertEquals(
         LOCATION_NAME_UNKNOWN, 
fileset4.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
 
+    // create fileset with multiple locations
+    String filesetName5 = "test_create_fileset_with_multiple_locations";
+    Map<String, String> storageLocations =
+        ImmutableMap.of(
+            "location1",
+            storageLocation(filesetName5 + "_location1"),
+            "location2",
+            storageLocation(filesetName5 + "_location2"));
+    fileset =
+        createMultipleLocationsFileset(
+            filesetName5,
+            "comment",
+            MANAGED,
+            storageLocations,
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "location1"));
+    Assertions.assertNotNull(fileset, "fileset should be created");
+    Assertions.assertEquals("comment", fileset.comment());
+    Assertions.assertEquals(MANAGED, fileset.type());
+    Map<String, String> expectedStorageLocations =
+        new HashMap<String, String>(storageLocations) {
+          {
+            put(LOCATION_NAME_UNKNOWN, storageLocation(filesetName5));
+          }
+        };
+    Assertions.assertEquals(expectedStorageLocations, 
fileset.storageLocations());
+    Assertions.assertEquals(1, fileset.properties().size());
+    Assertions.assertEquals("location1", 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+
+    assertFilesetExists(filesetName5);
+    fileset = 
catalog.asFilesetCatalog().loadFileset(NameIdentifier.of(schemaName, 
filesetName5));
+    Assertions.assertNotNull(fileset, "fileset should be created");
+    Assertions.assertEquals("comment", fileset.comment());
+    Assertions.assertEquals(MANAGED, fileset.type());
+    Assertions.assertEquals(expectedStorageLocations, 
fileset.storageLocations());
+    Assertions.assertEquals(1, fileset.properties().size());
+    Assertions.assertEquals("location1", 
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+
+    // create fileset with null multiple locations
+    String filesetName6 = "test_create_fileset_with_null_multiple_locations";
+    createMultipleLocationsFileset(filesetName6, "comment", MANAGED, null, 
null);
+    assertFilesetExists(filesetName6);
+    fileset = 
catalog.asFilesetCatalog().loadFileset(NameIdentifier.of(schemaName, 
filesetName6));
+    Assertions.assertNotNull(fileset, "fileset should be created");
+    Assertions.assertEquals("comment", fileset.comment());
+    Assertions.assertEquals(MANAGED, fileset.type());
+    Assertions.assertEquals(
+        ImmutableMap.of(LOCATION_NAME_UNKNOWN, storageLocation(filesetName6)),
+        fileset.storageLocations());
+
     // create fileset with null fileset name
     Assertions.assertThrows(
         IllegalNameIdentifierException.class,
@@ -749,6 +805,68 @@ public class HadoopCatalogIT extends BaseIT {
     }
   }
 
+  @Test
+  public void testGetFileLocationWithMultipleLocations() {
+    String filesetName = GravitinoITUtils.genRandomName("fileset");
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    
Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent));
+    String locationName1 = "location1";
+    String locationName2 = "location2";
+    Map<String, String> storageLocations =
+        ImmutableMap.of(
+            locationName1,
+            storageLocation(filesetName + "_location1"),
+            locationName2,
+            storageLocation(filesetName + "_location2"));
+    Fileset expectedFileset =
+        catalog
+            .asFilesetCatalog()
+            .createMultipleLocationFileset(
+                filesetIdent,
+                "fileset comment",
+                MANAGED,
+                storageLocations,
+                ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, 
locationName1));
+    
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
+    // test without caller context
+    String actualFileLocation1 =
+        catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test1.par", 
locationName1);
+    Assertions.assertEquals(
+        expectedFileset.storageLocations().get(locationName1) + "/test1.par", 
actualFileLocation1);
+
+    String actualFileLocation2 =
+        catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test2.par", 
locationName2);
+    Assertions.assertEquals(
+        expectedFileset.storageLocations().get(locationName2) + "/test2.par", 
actualFileLocation2);
+
+    // test with caller context
+    try {
+      Map<String, String> context = new HashMap<>();
+      context.put(
+          FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+          InternalClientType.HADOOP_GVFS.name());
+      context.put(
+          FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
+          FilesetDataOperation.CREATE.name());
+      CallerContext callerContext = 
CallerContext.builder().withContext(context).build();
+      CallerContext.CallerContextHolder.set(callerContext);
+
+      actualFileLocation1 =
+          catalog.asFilesetCatalog().getFileLocation(filesetIdent, 
"/test1.par", locationName1);
+      Assertions.assertEquals(
+          expectedFileset.storageLocations().get(locationName1) + "/test1.par",
+          actualFileLocation1);
+
+      actualFileLocation2 =
+          catalog.asFilesetCatalog().getFileLocation(filesetIdent, 
"/test2.par", locationName2);
+      Assertions.assertEquals(
+          expectedFileset.storageLocations().get(locationName2) + "/test2.par",
+          actualFileLocation2);
+    } finally {
+      CallerContext.CallerContextHolder.remove();
+    }
+  }
+
   @Test
   public void testGetFileLocationWithInvalidAuditHeaders() {
     try {
@@ -869,6 +987,33 @@ public class HadoopCatalogIT extends BaseIT {
             NameIdentifier.of(schemaName, filesetName), comment, type, 
storageLocation, properties);
   }
 
+  private Fileset createMultipleLocationsFileset(
+      String filesetName,
+      String comment,
+      Fileset.Type type,
+      Map<String, String> storageLocations,
+      Map<String, String> properties) {
+    if (storageLocations != null) {
+      for (String location : storageLocations.values()) {
+        Path path = new Path(location);
+        try {
+          fileSystem.deleteOnExit(path);
+        } catch (IOException e) {
+          LOG.warn("Failed to delete location: {}", path, e);
+        }
+      }
+    }
+
+    return catalog
+        .asFilesetCatalog()
+        .createMultipleLocationFileset(
+            NameIdentifier.of(schemaName, filesetName),
+            comment,
+            type,
+            storageLocations,
+            properties);
+  }
+
   private void assertFilesetExists(String filesetName) throws IOException {
     Assertions.assertTrue(
         catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, 
filesetName)),
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index d9b4ddb49f..b7e47d239c 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -46,6 +46,7 @@ import 
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.exceptions.NoSuchModelException;
@@ -602,10 +603,18 @@ public class ErrorHandlers {
           throw new IllegalArgumentException(errorMessage);
 
         case ErrorConstants.NOT_FOUND_CODE:
-          if 
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
+          if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
+            throw new NoSuchMetalakeException(errorMessage);
+          } else if 
(errorResponse.getType().equals(NoSuchCatalogException.class.getSimpleName())) {
+            throw new NoSuchCatalogException(errorMessage);
+          } else if 
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
             throw new NoSuchSchemaException(errorMessage);
           } else if 
(errorResponse.getType().equals(NoSuchFilesetException.class.getSimpleName())) {
             throw new NoSuchFilesetException(errorMessage);
+          } else if (errorResponse
+              .getType()
+              .equals(NoSuchLocationNameException.class.getSimpleName())) {
+            throw new NoSuchLocationNameException(errorMessage);
           } else {
             throw new NotFoundException(errorMessage);
           }
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
index 2ad0157d8a..2bef734d06 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
@@ -44,6 +44,7 @@ import 
org.apache.gravitino.dto.responses.FileLocationResponse;
 import org.apache.gravitino.dto.responses.FilesetResponse;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
@@ -125,28 +126,23 @@ class FilesetCatalog extends BaseSchemaCatalog
   }
 
   /**
-   * Create a fileset metadata in the catalog.
+   * Create a fileset metadata with multiple storage locations in the catalog.
    *
-   * <p>If the type of the fileset object is "MANAGED", the underlying 
storageLocation can be null,
-   * and Gravitino will manage the storage location based on the location of 
the schema.
-   *
-   * <p>If the type of the fileset object is "EXTERNAL", the underlying 
storageLocation must be set.
-   *
-   * @param ident A fileset identifier, which should be "schema.fileset" 
format.
+   * @param ident A fileset identifier.
    * @param comment The comment of the fileset.
    * @param type The type of the fileset.
-   * @param storageLocation The storage location of the fileset.
+   * @param storageLocations The location names and storage locations of the 
fileset.
    * @param properties The properties of the fileset.
    * @return The created fileset metadata
    * @throws NoSuchSchemaException If the schema does not exist.
    * @throws FilesetAlreadyExistsException If the fileset already exists.
    */
   @Override
-  public Fileset createFileset(
+  public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
       String comment,
       Fileset.Type type,
-      String storageLocation,
+      Map<String, String> storageLocations,
       Map<String, String> properties)
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     checkFilesetNameIdentifier(ident);
@@ -157,9 +153,10 @@ class FilesetCatalog extends BaseSchemaCatalog
             .name(ident.name())
             .comment(comment)
             .type(type)
-            .storageLocation(storageLocation)
+            .storageLocations(storageLocations)
             .properties(properties)
             .build();
+    req.validate();
 
     FilesetResponse resp =
         restClient.post(
@@ -238,12 +235,13 @@ class FilesetCatalog extends BaseSchemaCatalog
    *
    * @param ident A fileset identifier.
    * @param subPath The sub path to the file or directory.
+   * @param locationName The name of the location to be accessed.
    * @return The actual location of the file or directory.
    * @throws NoSuchFilesetException If the fileset does not exist.
    */
   @Override
-  public String getFileLocation(NameIdentifier ident, String subPath)
-      throws NoSuchFilesetException {
+  public String getFileLocation(NameIdentifier ident, String subPath, String 
locationName)
+      throws NoSuchFilesetException, NoSuchLocationNameException {
     checkFilesetNameIdentifier(ident);
     Namespace fullNamespace = getFilesetFullNamespace(ident.namespace());
 
@@ -252,6 +250,9 @@ class FilesetCatalog extends BaseSchemaCatalog
 
       Map<String, String> params = new HashMap<>();
       params.put("sub_path", RESTUtils.encodeString(subPath));
+      if (locationName != null) {
+        params.put("location_name", RESTUtils.encodeString(locationName));
+      }
       FileLocationResponse resp =
           restClient.get(
               formatFileLocationRequestPath(fullNamespace, ident.name()),
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
index 6e587b1155..b2c6b789c2 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
@@ -79,8 +79,8 @@ class GenericFileset implements Fileset, SupportsTags, 
SupportsRoles, SupportsCr
   }
 
   @Override
-  public String storageLocation() {
-    return filesetDTO.storageLocation();
+  public Map<String, String> storageLocations() {
+    return filesetDTO.storageLocations();
   }
 
   @Override
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
index 6597ed5c59..d878c0d684 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.dto.responses.FilesetResponse;
 import org.apache.gravitino.exceptions.AlreadyExistsException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.file.Fileset;
@@ -237,7 +238,7 @@ public class TestFilesetCatalog extends TestBase {
             .properties(ImmutableMap.of("k1", "v1"))
             .build();
     FilesetResponse resp = new FilesetResponse(mockFileset);
-    buildMockResource(Method.POST, filesetPath, req, resp, SC_OK);
+    buildMockResource(Method.POST, filesetPath, toMultipleLocationReq(req), 
resp, SC_OK);
     Fileset loadedFileset =
         catalog
             .asFilesetCatalog()
@@ -254,7 +255,7 @@ public class TestFilesetCatalog extends TestBase {
     ErrorResponse errResp =
         ErrorResponse.alreadyExists(
             FilesetAlreadyExistsException.class.getSimpleName(), "fileset 
already exists");
-    buildMockResource(Method.POST, filesetPath, req, errResp, SC_CONFLICT);
+    buildMockResource(Method.POST, filesetPath, toMultipleLocationReq(req), 
errResp, SC_CONFLICT);
     Assertions.assertThrows(
         AlreadyExistsException.class,
         () ->
@@ -285,6 +286,19 @@ public class TestFilesetCatalog extends TestBase {
         "internal error");
   }
 
+  private FilesetCreateRequest toMultipleLocationReq(FilesetCreateRequest req) 
{
+    return FilesetCreateRequest.builder()
+        .name(req.getName())
+        .type(req.getType())
+        .comment(req.getComment())
+        .storageLocations(
+            req.getStorageLocation() == null
+                ? ImmutableMap.of()
+                : ImmutableMap.of(LOCATION_NAME_UNKNOWN, 
req.getStorageLocation()))
+        .properties(req.getProperties())
+        .build();
+  }
+
   @Test
   public void testDropFileset() throws JsonProcessingException {
     NameIdentifier fileset = NameIdentifier.of("schema1", "fileset1");
@@ -440,6 +454,21 @@ public class TestFilesetCatalog extends TestBase {
     Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
     Assertions.assertEquals(mockFileLocation, actualFileLocation);
 
+    // get location by location name
+    String mockLocationName = "location1";
+    queryParams.put("location_name", mockLocationName);
+    buildMockResource(Method.GET, filesetPath, queryParams, null, resp, SC_OK);
+
+    actualFileLocation =
+        catalog
+            .asFilesetCatalog()
+            .getFileLocation(
+                NameIdentifier.of(fileset.namespace().level(2), 
fileset.name()),
+                mockSubPath,
+                mockLocationName);
+    Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
+    Assertions.assertEquals(mockFileLocation, actualFileLocation);
+
     // Throw schema not found exception
     ErrorResponse errResp =
         ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(), 
"schema not found");
@@ -475,6 +504,22 @@ public class TestFilesetCatalog extends TestBase {
                 .getFileLocation(
                     NameIdentifier.of(fileset.namespace().level(2), 
fileset.name()), mockSubPath),
         "internal error");
+
+    // throw NoSuchLocationNameException
+    ErrorResponse errResp3 =
+        ErrorResponse.notFound(
+            NoSuchLocationNameException.class.getSimpleName(), "location name 
not found");
+    buildMockResource(Method.GET, filesetPath, null, errResp3, SC_NOT_FOUND);
+    Assertions.assertThrows(
+        NoSuchLocationNameException.class,
+        () ->
+            catalog
+                .asFilesetCatalog()
+                .getFileLocation(
+                    NameIdentifier.of(fileset.namespace().level(2), 
fileset.name()),
+                    mockSubPath,
+                    mockLocationName),
+        "location name not found");
   }
 
   @Test
diff --git a/clients/client-python/gravitino/client/fileset_catalog.py 
b/clients/client-python/gravitino/client/fileset_catalog.py
index 56504a5dc4..d5bf996be4 100644
--- a/clients/client-python/gravitino/client/fileset_catalog.py
+++ b/clients/client-python/gravitino/client/fileset_catalog.py
@@ -159,28 +159,14 @@ class FilesetCatalog(BaseSchemaCatalog, 
SupportsCredentials):
         Returns:
             The created fileset metadata
         """
-        # todo: call create_multiple_location_fileset if multiple storage 
locations are supported
-        self.check_fileset_name_identifier(ident)
-
-        full_namespace = self._get_fileset_full_namespace(ident.namespace())
-
-        req = FilesetCreateRequest(
-            name=encode_string(ident.name()),
-            comment=comment,
-            fileset_type=fileset_type,
-            storage_location=storage_location,
-            properties=properties,
+        locations = (
+            {Fileset.LOCATION_NAME_UNKNOWN: storage_location}
+            if storage_location
+            else {}
         )
-
-        resp = self.rest_client.post(
-            self.format_fileset_request_path(full_namespace),
-            req,
-            error_handler=FILESET_ERROR_HANDLER,
+        return self.create_multiple_location_fileset(
+            ident, comment, fileset_type, locations, properties
         )
-        fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
-        fileset_resp.validate()
-
-        return GenericFileset(fileset_resp.fileset(), self.rest_client, 
full_namespace)
 
     def create_multiple_location_fileset(
         self,
@@ -206,7 +192,28 @@ class FilesetCatalog(BaseSchemaCatalog, 
SupportsCredentials):
         Returns:
             The created fileset metadata
         """
-        raise NotImplementedError("Multiple storage locations are not 
supported yet")
+        self.check_fileset_name_identifier(ident)
+
+        full_namespace = self._get_fileset_full_namespace(ident.namespace())
+
+        req = FilesetCreateRequest(
+            name=encode_string(ident.name()),
+            comment=comment,
+            fileset_type=fileset_type,
+            storage_locations=storage_locations,
+            properties=properties,
+        )
+        req.validate()
+
+        resp = self.rest_client.post(
+            self.format_fileset_request_path(full_namespace),
+            req,
+            error_handler=FILESET_ERROR_HANDLER,
+        )
+        fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
+        fileset_resp.validate()
+
+        return GenericFileset(fileset_resp.fileset(), self.rest_client, 
full_namespace)
 
     def alter_fileset(self, ident: NameIdentifier, *changes) -> Fileset:
         """Update a fileset metadata in the catalog.
@@ -279,7 +286,8 @@ class FilesetCatalog(BaseSchemaCatalog, 
SupportsCredentials):
         Args:
              ident: A fileset identifier, which should be "schema.fileset" 
format.
              sub_path: The sub path of the file or directory.
-             location_name: The location name of the fileset.
+             location_name: The location name of the fileset, if not specified,
+                           the default location will be used.
 
         Raises:
             NoSuchLocationNameException If the location name does not exist.
@@ -292,8 +300,11 @@ class FilesetCatalog(BaseSchemaCatalog, 
SupportsCredentials):
         full_namespace = self._get_fileset_full_namespace(ident.namespace())
         try:
             caller_context: CallerContext = CallerContextHolder.get()
-            # todo: add location name to the request
-            params = {"sub_path": encode_string(sub_path)}
+            params = {
+                "sub_path": encode_string(sub_path),
+            }
+            if location_name is not None:
+                params["location_name"] = encode_string(location_name)
 
             resp = self.rest_client.get(
                 self.format_file_location_request_path(full_namespace, 
ident.name()),
diff --git a/clients/client-python/gravitino/client/generic_fileset.py 
b/clients/client-python/gravitino/client/generic_fileset.py
index c3b99dd497..cb3728c187 100644
--- a/clients/client-python/gravitino/client/generic_fileset.py
+++ b/clients/client-python/gravitino/client/generic_fileset.py
@@ -56,12 +56,8 @@ class GenericFileset(Fileset, SupportsCredentials):
     def type(self) -> Fileset.Type:
         return self._fileset.type()
 
-    def storage_location(self) -> str:
-        return self._fileset.storage_location()
-
     def storage_locations(self) -> Dict[str, str]:
-        # todo: implement this method and remove storage_location method
-        pass
+        return self._fileset.storage_locations()
 
     def comment(self) -> Optional[str]:
         return self._fileset.comment()
diff --git a/clients/client-python/gravitino/dto/fileset_dto.py 
b/clients/client-python/gravitino/dto/fileset_dto.py
index d098cd968b..894b519721 100644
--- a/clients/client-python/gravitino/dto/fileset_dto.py
+++ b/clients/client-python/gravitino/dto/fileset_dto.py
@@ -32,8 +32,8 @@ class FilesetDTO(Fileset, DataClassJsonMixin):
     _comment: Optional[str] = field(metadata=config(field_name="comment"))
     _type: Fileset.Type = field(metadata=config(field_name="type"))
     _properties: Dict[str, str] = 
field(metadata=config(field_name="properties"))
-    _storage_location: str = field(
-        default=None, metadata=config(field_name="storageLocation")
+    _storage_locations: Dict[str, str] = field(
+        metadata=config(field_name="storageLocations")
     )
     _audit: AuditDTO = field(default=None, metadata=config(field_name="audit"))
 
@@ -43,12 +43,8 @@ class FilesetDTO(Fileset, DataClassJsonMixin):
     def type(self) -> Fileset.Type:
         return self._type
 
-    def storage_location(self) -> str:
-        return self._storage_location
-
     def storage_locations(self) -> Dict[str, str]:
-        # todo: implement this method and remove storage_location method
-        pass
+        return self._storage_locations
 
     def comment(self) -> Optional[str]:
         return self._comment
diff --git 
a/clients/client-python/gravitino/dto/requests/fileset_create_request.py 
b/clients/client-python/gravitino/dto/requests/fileset_create_request.py
index 980287ba0e..74b692f215 100644
--- a/clients/client-python/gravitino/dto/requests/fileset_create_request.py
+++ b/clients/client-python/gravitino/dto/requests/fileset_create_request.py
@@ -31,8 +31,8 @@ class FilesetCreateRequest(RESTRequest):
     _name: str = field(metadata=config(field_name="name"))
     _comment: Optional[str] = field(metadata=config(field_name="comment"))
     _type: Optional[Fileset.Type] = field(metadata=config(field_name="type"))
-    _storage_location: Optional[str] = field(
-        metadata=config(field_name="storageLocation")
+    _storage_locations: Optional[Dict[str, str]] = field(
+        metadata=config(field_name="storageLocations")
     )
     _properties: Optional[Dict[str, str]] = field(
         metadata=config(field_name="properties")
@@ -43,13 +43,13 @@ class FilesetCreateRequest(RESTRequest):
         name: str,
         comment: Optional[str] = None,
         fileset_type: Fileset.Type = None,
-        storage_location: Optional[str] = None,
+        storage_locations: Optional[Dict[str, str]] = None,
         properties: Optional[Dict[str, str]] = None,
     ):
         self._name = name
         self._comment = comment
         self._type = fileset_type
-        self._storage_location = storage_location
+        self._storage_locations = storage_locations
         self._properties = properties
 
     def validate(self):
diff --git a/clients/client-python/gravitino/dto/responses/fileset_response.py 
b/clients/client-python/gravitino/dto/responses/fileset_response.py
index faabe3d9ed..607c2966d0 100644
--- a/clients/client-python/gravitino/dto/responses/fileset_response.py
+++ b/clients/client-python/gravitino/dto/responses/fileset_response.py
@@ -44,9 +44,13 @@ class FilesetResponse(BaseResponse):
             raise IllegalArgumentException("fileset must not be null")
         if not self._fileset.name():
             raise IllegalArgumentException("fileset 'name' must not be null 
and empty")
-        if not self._fileset.storage_location():
+        if not self._fileset.storage_locations():
             raise IllegalArgumentException(
-                "fileset 'storageLocation' must not be null and empty"
+                "fileset 'storageLocations' must not be null"
+            )
+        if not self._fileset.storage_locations():
+            raise IllegalArgumentException(
+                "fileset 'storageLocations' must not be empty. At least one 
location is required."
             )
         if self._fileset.type() is None:
             raise IllegalArgumentException("fileset 'type' must not be null 
and empty")
diff --git 
a/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py 
b/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
index 95baa798ce..569ce1832b 100644
--- 
a/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
+++ 
b/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
@@ -22,6 +22,7 @@ from gravitino.exceptions.base import (
     NoSuchFilesetException,
     NoSuchSchemaException,
     CatalogNotInUseException,
+    NoSuchLocationNameException,
 )
 
 
@@ -38,6 +39,8 @@ class FilesetErrorHandler(RestErrorHandler):
                 raise NoSuchSchemaException(error_message)
             if exception_type == NoSuchFilesetException.__name__:
                 raise NoSuchFilesetException(error_message)
+            if exception_type == NoSuchLocationNameException.__name__:
+                raise NoSuchLocationNameException(error_message)
 
         if code == ErrorConstants.NOT_IN_USE_CODE:
             raise CatalogNotInUseException(error_message)
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py 
b/clients/client-python/tests/integration/test_fileset_catalog.py
index 98a31fa0a2..11e83c0b20 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -17,7 +17,7 @@
 
 import logging
 from random import randint
-from typing import Dict, List
+from typing import Dict, List, Optional
 
 from gravitino import (
     NameIdentifier,
@@ -48,10 +48,12 @@ class TestFilesetCatalog(IntegrationTestEnv):
     schema_name: str = "schema"
 
     fileset_name: str = "fileset"
+    multiple_locations_fileset_name: str = "multiple_locations_fileset"
     fileset_alter_name: str = fileset_name + "Alter"
     fileset_comment: str = "fileset_comment"
 
     fileset_location: str = "/tmp/TestFilesetCatalog"
+    fileset_location2: str = "/tmp/TestFilesetCatalog2"
     fileset_properties_key1: str = "fileset_properties_key1"
     fileset_properties_value1: str = "fileset_properties_value1"
     fileset_properties_key2: str = "fileset_properties_key2"
@@ -60,6 +62,10 @@ class TestFilesetCatalog(IntegrationTestEnv):
         fileset_properties_key1: fileset_properties_value1,
         fileset_properties_key2: fileset_properties_value2,
     }
+    multiple_locations_fileset_properties: Dict[str, str] = {
+        Fileset.PROPERTY_DEFAULT_LOCATION_NAME: "location1",
+        **fileset_properties,
+    }
     fileset_new_name = fileset_name + "_new"
 
     catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name, 
catalog_name)
@@ -67,6 +73,9 @@ class TestFilesetCatalog(IntegrationTestEnv):
         metalake_name, catalog_name, schema_name
     )
     fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_name)
+    multiple_locations_fileset_ident: NameIdentifier = NameIdentifier.of(
+        schema_name, multiple_locations_fileset_name
+    )
     fileset_new_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_new_name)
 
     gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
@@ -163,16 +172,55 @@ class TestFilesetCatalog(IntegrationTestEnv):
             properties=self.fileset_properties,
         )
 
+    def create_multiple_locations_fileset(self) -> Fileset:
+        catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
+        return catalog.as_fileset_catalog().create_multiple_location_fileset(
+            ident=self.multiple_locations_fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=self.fileset_comment,
+            storage_locations={
+                "location1": self.fileset_location,
+                "location2": self.fileset_location2,
+            },
+            properties=self.multiple_locations_fileset_properties,
+        )
+
     def create_custom_fileset(
-        self, ident: NameIdentifier, storage_location: str
+        self,
+        ident: NameIdentifier,
+        storage_location: Optional[str],
+        storage_locations: Optional[Dict[str, str]] = None,
+        default_location_name: Optional[str] = None,
     ) -> Fileset:
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
-        return catalog.as_fileset_catalog().create_fileset(
+        if storage_locations is None:
+            return catalog.as_fileset_catalog().create_fileset(
+                ident=ident,
+                fileset_type=Fileset.Type.MANAGED,
+                comment=self.fileset_comment,
+                storage_location=storage_location,
+                properties=(
+                    self.fileset_properties
+                    if default_location_name is None
+                    else {
+                        Fileset.PROPERTY_DEFAULT_LOCATION_NAME: 
default_location_name,
+                        **self.fileset_properties,
+                    }
+                ),
+            )
+        return catalog.as_fileset_catalog().create_multiple_location_fileset(
             ident=ident,
             fileset_type=Fileset.Type.MANAGED,
             comment=self.fileset_comment,
-            storage_location=storage_location,
-            properties=self.fileset_properties,
+            storage_locations=storage_locations,
+            properties=(
+                self.fileset_properties
+                if default_location_name is None
+                else {
+                    Fileset.PROPERTY_DEFAULT_LOCATION_NAME: 
default_location_name,
+                    **self.fileset_properties,
+                }
+            ),
         )
 
     def test_create_fileset(self):
@@ -189,6 +237,28 @@ class TestFilesetCatalog(IntegrationTestEnv):
         )
         self.assertEqual(fileset.storage_location(), 
f"file:{self.fileset_location}")
 
+    def test_create_fileset_with_multiple_locations(self):
+        fileset = self.create_multiple_locations_fileset()
+        self.assertIsNotNone(fileset)
+        self.assertEqual(fileset.type(), Fileset.Type.MANAGED)
+        self.assertEqual(fileset.comment(), self.fileset_comment)
+        self.assertEqual(
+            fileset.properties(), self.multiple_locations_fileset_properties
+        )
+        self.assertEqual(
+            fileset.storage_location(),
+            
f"file:/tmp/test1/{self.schema_name}/{self.multiple_locations_fileset_name}",
+        )
+        self.assertEqual(
+            fileset.storage_locations(),
+            {
+                Fileset.LOCATION_NAME_UNKNOWN: f"file:/tmp/test1/"
+                f"{self.schema_name}/{self.multiple_locations_fileset_name}",
+                "location1": f"file:{self.fileset_location}",
+                "location2": f"file:{self.fileset_location2}",
+            },
+        )
+
     def test_drop_fileset(self):
         self.create_fileset()
         catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
@@ -271,6 +341,26 @@ class TestFilesetCatalog(IntegrationTestEnv):
 
         self.assertEqual(actual_file_location, 
f"file:{fileset_location}/test/test.txt")
 
+        # test get file location from multiple locations fileset
+        fileset_ident: NameIdentifier = NameIdentifier.of(
+            self.schema_name, "test_get_file_location_multiple_locations"
+        )
+        locations = {
+            "default": "/tmp/test_get_file_location",
+            "location1": "/tmp/test_get_file_location1",
+            "location2": "/tmp/test_get_file_location2",
+        }
+        self.create_custom_fileset(fileset_ident, None, locations, "location1")
+        actual_file_location = (
+            self.gravitino_client.load_catalog(name=self.catalog_name)
+            .as_fileset_catalog()
+            .get_file_location(fileset_ident, "/test/test.txt")
+        )
+
+        self.assertEqual(
+            actual_file_location, 
f"file:{locations['location1']}/test/test.txt"
+        )
+
         # test rename without sub path should throw an exception
         caller_context = CallerContext(
             {
diff --git a/clients/client-python/tests/unittests/mock_base.py 
b/clients/client-python/tests/unittests/mock_base.py
index 2c7d6e3e58..8a33d96d48 100644
--- a/clients/client-python/tests/unittests/mock_base.py
+++ b/clients/client-python/tests/unittests/mock_base.py
@@ -94,8 +94,11 @@ def mock_load_fileset(name: str, location: str):
         _name=name,
         _type=Fileset.Type.MANAGED,
         _comment="this is test",
-        _properties={"k": "v"},
-        _storage_location=location,
+        _properties={
+            "k": "v",
+            Fileset.PROPERTY_DEFAULT_LOCATION_NAME: 
Fileset.LOCATION_NAME_UNKNOWN,
+        },
+        _storage_locations={Fileset.LOCATION_NAME_UNKNOWN: location},
         _audit=audit_dto,
     )
     return fileset
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
index 785f19204a..ae18ec3518 100644
--- 
a/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
@@ -63,8 +63,11 @@ public class FilesetResponse extends BaseResponse {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(fileset.name()), "fileset 'name' must not be 
null and empty");
     Preconditions.checkArgument(
-        StringUtils.isNotBlank(fileset.storageLocation()),
-        "fileset 'storageLocation' must not be null and empty");
-    Preconditions.checkNotNull(fileset.type(), "fileset 'type' must not be 
null and empty");
+        fileset.type() != null, "fileset 'type' must not be null and empty");
+    Preconditions.checkArgument(
+        fileset.storageLocations() != null, "fileset 'storageLocations' must 
not be null");
+    Preconditions.checkArgument(
+        !fileset.storageLocations().isEmpty(),
+        "fileset 'storageLocations' must not be empty. At least one location 
is required.");
   }
 }


Reply via email to