This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2337c2ed1c [#6894] feat(client): Java/Python clients supports fileset
multiple locations (#6943)
2337c2ed1c is described below
commit 2337c2ed1c15ec41874a2e6b5fcb19aa210bca16
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 15 18:48:51 2025 +0800
[#6894] feat(client): Java/Python clients supports fileset multiple
locations (#6943)
### What changes were proposed in this pull request?
Java/Python clients supports fileset multiple locations
### Why are the changes needed?
Fix: #6894
### Does this PR introduce _any_ user-facing change?
yes, Java/Python clients support fileset multiple locations
### How was this patch tested?
tests added
Co-authored-by: mchades <[email protected]>
---
.../hadoop/integration/test/HadoopCatalogIT.java | 145 +++++++++++++++++++++
.../org/apache/gravitino/client/ErrorHandlers.java | 11 +-
.../apache/gravitino/client/FilesetCatalog.java | 27 ++--
.../apache/gravitino/client/GenericFileset.java | 4 +-
.../gravitino/client/TestFilesetCatalog.java | 49 ++++++-
.../gravitino/client/fileset_catalog.py | 59 +++++----
.../gravitino/client/generic_fileset.py | 6 +-
clients/client-python/gravitino/dto/fileset_dto.py | 10 +-
.../dto/requests/fileset_create_request.py | 8 +-
.../gravitino/dto/responses/fileset_response.py | 8 +-
.../exceptions/handlers/fileset_error_handler.py | 3 +
.../tests/integration/test_fileset_catalog.py | 100 +++++++++++++-
clients/client-python/tests/unittests/mock_base.py | 7 +-
.../gravitino/dto/responses/FilesetResponse.java | 9 +-
14 files changed, 376 insertions(+), 70 deletions(-)
diff --git
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 8a45b5c8d5..ee77d11e51 100644
---
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -204,6 +204,7 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("comment", fileset.comment());
Assertions.assertEquals(MANAGED, fileset.type());
Assertions.assertEquals(storageLocation, fileset.storageLocation());
+ Assertions.assertEquals(storageLocation,
fileset.storageLocations().get(LOCATION_NAME_UNKNOWN));
Assertions.assertEquals(2, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertEquals(
@@ -228,6 +229,10 @@ public class HadoopCatalogIT extends BaseIT {
storageLocation(filesetName2),
fileset2.storageLocation(),
"storage location should be created");
+ Assertions.assertEquals(
+ storageLocation(filesetName2),
+ fileset2.storageLocations().get(LOCATION_NAME_UNKNOWN),
+ "storage location should be created");
Assertions.assertEquals(
ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, LOCATION_NAME_UNKNOWN),
fileset2.properties());
@@ -245,10 +250,61 @@ public class HadoopCatalogIT extends BaseIT {
Assertions.assertEquals("comment", fileset4.comment());
Assertions.assertEquals(MANAGED, fileset4.type());
Assertions.assertEquals(expectedStorageLocation4,
fileset4.storageLocation());
+ Assertions.assertEquals(
+ expectedStorageLocation4,
fileset4.storageLocations().get(LOCATION_NAME_UNKNOWN));
Assertions.assertEquals(1, fileset4.properties().size(), "properties
should be empty");
Assertions.assertEquals(
LOCATION_NAME_UNKNOWN,
fileset4.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+ // create fileset with multiple locations
+ String filesetName5 = "test_create_fileset_with_multiple_locations";
+ Map<String, String> storageLocations =
+ ImmutableMap.of(
+ "location1",
+ storageLocation(filesetName5 + "_location1"),
+ "location2",
+ storageLocation(filesetName5 + "_location2"));
+ fileset =
+ createMultipleLocationsFileset(
+ filesetName5,
+ "comment",
+ MANAGED,
+ storageLocations,
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "location1"));
+ Assertions.assertNotNull(fileset, "fileset should be created");
+ Assertions.assertEquals("comment", fileset.comment());
+ Assertions.assertEquals(MANAGED, fileset.type());
+ Map<String, String> expectedStorageLocations =
+ new HashMap<String, String>(storageLocations) {
+ {
+ put(LOCATION_NAME_UNKNOWN, storageLocation(filesetName5));
+ }
+ };
+ Assertions.assertEquals(expectedStorageLocations,
fileset.storageLocations());
+ Assertions.assertEquals(1, fileset.properties().size());
+ Assertions.assertEquals("location1",
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+
+ assertFilesetExists(filesetName5);
+ fileset =
catalog.asFilesetCatalog().loadFileset(NameIdentifier.of(schemaName,
filesetName5));
+ Assertions.assertNotNull(fileset, "fileset should be created");
+ Assertions.assertEquals("comment", fileset.comment());
+ Assertions.assertEquals(MANAGED, fileset.type());
+ Assertions.assertEquals(expectedStorageLocations,
fileset.storageLocations());
+ Assertions.assertEquals(1, fileset.properties().size());
+ Assertions.assertEquals("location1",
fileset.properties().get(PROPERTY_DEFAULT_LOCATION_NAME));
+
+ // create fileset with null multiple locations
+ String filesetName6 = "test_create_fileset_with_null_multiple_locations";
+ createMultipleLocationsFileset(filesetName6, "comment", MANAGED, null,
null);
+ assertFilesetExists(filesetName6);
+ fileset =
catalog.asFilesetCatalog().loadFileset(NameIdentifier.of(schemaName,
filesetName6));
+ Assertions.assertNotNull(fileset, "fileset should be created");
+ Assertions.assertEquals("comment", fileset.comment());
+ Assertions.assertEquals(MANAGED, fileset.type());
+ Assertions.assertEquals(
+ ImmutableMap.of(LOCATION_NAME_UNKNOWN, storageLocation(filesetName6)),
+ fileset.storageLocations());
+
// create fileset with null fileset name
Assertions.assertThrows(
IllegalNameIdentifierException.class,
@@ -749,6 +805,68 @@ public class HadoopCatalogIT extends BaseIT {
}
}
+ @Test
+ public void testGetFileLocationWithMultipleLocations() {
+ String filesetName = GravitinoITUtils.genRandomName("fileset");
+ NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+
Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent));
+ String locationName1 = "location1";
+ String locationName2 = "location2";
+ Map<String, String> storageLocations =
+ ImmutableMap.of(
+ locationName1,
+ storageLocation(filesetName + "_location1"),
+ locationName2,
+ storageLocation(filesetName + "_location2"));
+ Fileset expectedFileset =
+ catalog
+ .asFilesetCatalog()
+ .createMultipleLocationFileset(
+ filesetIdent,
+ "fileset comment",
+ MANAGED,
+ storageLocations,
+ ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME,
locationName1));
+
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
+ // test without caller context
+ String actualFileLocation1 =
+ catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test1.par",
locationName1);
+ Assertions.assertEquals(
+ expectedFileset.storageLocations().get(locationName1) + "/test1.par",
actualFileLocation1);
+
+ String actualFileLocation2 =
+ catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test2.par",
locationName2);
+ Assertions.assertEquals(
+ expectedFileset.storageLocations().get(locationName2) + "/test2.par",
actualFileLocation2);
+
+ // test with caller context
+ try {
+ Map<String, String> context = new HashMap<>();
+ context.put(
+ FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+ InternalClientType.HADOOP_GVFS.name());
+ context.put(
+ FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
+ FilesetDataOperation.CREATE.name());
+ CallerContext callerContext =
CallerContext.builder().withContext(context).build();
+ CallerContext.CallerContextHolder.set(callerContext);
+
+ actualFileLocation1 =
+ catalog.asFilesetCatalog().getFileLocation(filesetIdent,
"/test1.par", locationName1);
+ Assertions.assertEquals(
+ expectedFileset.storageLocations().get(locationName1) + "/test1.par",
+ actualFileLocation1);
+
+ actualFileLocation2 =
+ catalog.asFilesetCatalog().getFileLocation(filesetIdent,
"/test2.par", locationName2);
+ Assertions.assertEquals(
+ expectedFileset.storageLocations().get(locationName2) + "/test2.par",
+ actualFileLocation2);
+ } finally {
+ CallerContext.CallerContextHolder.remove();
+ }
+ }
+
@Test
public void testGetFileLocationWithInvalidAuditHeaders() {
try {
@@ -869,6 +987,33 @@ public class HadoopCatalogIT extends BaseIT {
NameIdentifier.of(schemaName, filesetName), comment, type,
storageLocation, properties);
}
+ private Fileset createMultipleLocationsFileset(
+ String filesetName,
+ String comment,
+ Fileset.Type type,
+ Map<String, String> storageLocations,
+ Map<String, String> properties) {
+ if (storageLocations != null) {
+ for (String location : storageLocations.values()) {
+ Path path = new Path(location);
+ try {
+ fileSystem.deleteOnExit(path);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete location: {}", path, e);
+ }
+ }
+ }
+
+ return catalog
+ .asFilesetCatalog()
+ .createMultipleLocationFileset(
+ NameIdentifier.of(schemaName, filesetName),
+ comment,
+ type,
+ storageLocations,
+ properties);
+ }
+
private void assertFilesetExists(String filesetName) throws IOException {
Assertions.assertTrue(
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName,
filesetName)),
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index d9b4ddb49f..b7e47d239c 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -46,6 +46,7 @@ import
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.exceptions.NoSuchModelException;
@@ -602,10 +603,18 @@ public class ErrorHandlers {
throw new IllegalArgumentException(errorMessage);
case ErrorConstants.NOT_FOUND_CODE:
- if
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
+ if
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName()))
{
+ throw new NoSuchMetalakeException(errorMessage);
+ } else if
(errorResponse.getType().equals(NoSuchCatalogException.class.getSimpleName())) {
+ throw new NoSuchCatalogException(errorMessage);
+ } else if
(errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
throw new NoSuchSchemaException(errorMessage);
} else if
(errorResponse.getType().equals(NoSuchFilesetException.class.getSimpleName())) {
throw new NoSuchFilesetException(errorMessage);
+ } else if (errorResponse
+ .getType()
+ .equals(NoSuchLocationNameException.class.getSimpleName())) {
+ throw new NoSuchLocationNameException(errorMessage);
} else {
throw new NotFoundException(errorMessage);
}
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
index 2ad0157d8a..2bef734d06 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/FilesetCatalog.java
@@ -44,6 +44,7 @@ import
org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
@@ -125,28 +126,23 @@ class FilesetCatalog extends BaseSchemaCatalog
}
/**
- * Create a fileset metadata in the catalog.
+ * Create a fileset metadata with multiple storage locations in the catalog.
*
- * <p>If the type of the fileset object is "MANAGED", the underlying
storageLocation can be null,
- * and Gravitino will manage the storage location based on the location of
the schema.
- *
- * <p>If the type of the fileset object is "EXTERNAL", the underlying
storageLocation must be set.
- *
- * @param ident A fileset identifier, which should be "schema.fileset"
format.
+ * @param ident A fileset identifier.
* @param comment The comment of the fileset.
* @param type The type of the fileset.
- * @param storageLocation The storage location of the fileset.
+ * @param storageLocations The location names and storage locations of the
fileset.
* @param properties The properties of the fileset.
* @return The created fileset metadata
* @throws NoSuchSchemaException If the schema does not exist.
* @throws FilesetAlreadyExistsException If the fileset already exists.
*/
@Override
- public Fileset createFileset(
+ public Fileset createMultipleLocationFileset(
NameIdentifier ident,
String comment,
Fileset.Type type,
- String storageLocation,
+ Map<String, String> storageLocations,
Map<String, String> properties)
throws NoSuchSchemaException, FilesetAlreadyExistsException {
checkFilesetNameIdentifier(ident);
@@ -157,9 +153,10 @@ class FilesetCatalog extends BaseSchemaCatalog
.name(ident.name())
.comment(comment)
.type(type)
- .storageLocation(storageLocation)
+ .storageLocations(storageLocations)
.properties(properties)
.build();
+ req.validate();
FilesetResponse resp =
restClient.post(
@@ -238,12 +235,13 @@ class FilesetCatalog extends BaseSchemaCatalog
*
* @param ident A fileset identifier.
* @param subPath The sub path to the file or directory.
+ * @param locationName The name of the location to be accessed.
* @return The actual location of the file or directory.
* @throws NoSuchFilesetException If the fileset does not exist.
*/
@Override
- public String getFileLocation(NameIdentifier ident, String subPath)
- throws NoSuchFilesetException {
+ public String getFileLocation(NameIdentifier ident, String subPath, String
locationName)
+ throws NoSuchFilesetException, NoSuchLocationNameException {
checkFilesetNameIdentifier(ident);
Namespace fullNamespace = getFilesetFullNamespace(ident.namespace());
@@ -252,6 +250,9 @@ class FilesetCatalog extends BaseSchemaCatalog
Map<String, String> params = new HashMap<>();
params.put("sub_path", RESTUtils.encodeString(subPath));
+ if (locationName != null) {
+ params.put("location_name", RESTUtils.encodeString(locationName));
+ }
FileLocationResponse resp =
restClient.get(
formatFileLocationRequestPath(fullNamespace, ident.name()),
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
index 6e587b1155..b2c6b789c2 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericFileset.java
@@ -79,8 +79,8 @@ class GenericFileset implements Fileset, SupportsTags,
SupportsRoles, SupportsCr
}
@Override
- public String storageLocation() {
- return filesetDTO.storageLocation();
+ public Map<String, String> storageLocations() {
+ return filesetDTO.storageLocations();
}
@Override
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
index 6597ed5c59..d878c0d684 100644
---
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestFilesetCatalog.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchLocationNameException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.file.Fileset;
@@ -237,7 +238,7 @@ public class TestFilesetCatalog extends TestBase {
.properties(ImmutableMap.of("k1", "v1"))
.build();
FilesetResponse resp = new FilesetResponse(mockFileset);
- buildMockResource(Method.POST, filesetPath, req, resp, SC_OK);
+ buildMockResource(Method.POST, filesetPath, toMultipleLocationReq(req),
resp, SC_OK);
Fileset loadedFileset =
catalog
.asFilesetCatalog()
@@ -254,7 +255,7 @@ public class TestFilesetCatalog extends TestBase {
ErrorResponse errResp =
ErrorResponse.alreadyExists(
FilesetAlreadyExistsException.class.getSimpleName(), "fileset
already exists");
- buildMockResource(Method.POST, filesetPath, req, errResp, SC_CONFLICT);
+ buildMockResource(Method.POST, filesetPath, toMultipleLocationReq(req),
errResp, SC_CONFLICT);
Assertions.assertThrows(
AlreadyExistsException.class,
() ->
@@ -285,6 +286,19 @@ public class TestFilesetCatalog extends TestBase {
"internal error");
}
+ private FilesetCreateRequest toMultipleLocationReq(FilesetCreateRequest req)
{
+ return FilesetCreateRequest.builder()
+ .name(req.getName())
+ .type(req.getType())
+ .comment(req.getComment())
+ .storageLocations(
+ req.getStorageLocation() == null
+ ? ImmutableMap.of()
+ : ImmutableMap.of(LOCATION_NAME_UNKNOWN,
req.getStorageLocation()))
+ .properties(req.getProperties())
+ .build();
+ }
+
@Test
public void testDropFileset() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of("schema1", "fileset1");
@@ -440,6 +454,21 @@ public class TestFilesetCatalog extends TestBase {
Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
Assertions.assertEquals(mockFileLocation, actualFileLocation);
+ // get location by location name
+ String mockLocationName = "location1";
+ queryParams.put("location_name", mockLocationName);
+ buildMockResource(Method.GET, filesetPath, queryParams, null, resp, SC_OK);
+
+ actualFileLocation =
+ catalog
+ .asFilesetCatalog()
+ .getFileLocation(
+ NameIdentifier.of(fileset.namespace().level(2),
fileset.name()),
+ mockSubPath,
+ mockLocationName);
+ Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
+ Assertions.assertEquals(mockFileLocation, actualFileLocation);
+
// Throw schema not found exception
ErrorResponse errResp =
ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(),
"schema not found");
@@ -475,6 +504,22 @@ public class TestFilesetCatalog extends TestBase {
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2),
fileset.name()), mockSubPath),
"internal error");
+
+ // throw NoSuchLocationNameException
+ ErrorResponse errResp3 =
+ ErrorResponse.notFound(
+ NoSuchLocationNameException.class.getSimpleName(), "location name
not found");
+ buildMockResource(Method.GET, filesetPath, null, errResp3, SC_NOT_FOUND);
+ Assertions.assertThrows(
+ NoSuchLocationNameException.class,
+ () ->
+ catalog
+ .asFilesetCatalog()
+ .getFileLocation(
+ NameIdentifier.of(fileset.namespace().level(2),
fileset.name()),
+ mockSubPath,
+ mockLocationName),
+ "location name not found");
}
@Test
diff --git a/clients/client-python/gravitino/client/fileset_catalog.py
b/clients/client-python/gravitino/client/fileset_catalog.py
index 56504a5dc4..d5bf996be4 100644
--- a/clients/client-python/gravitino/client/fileset_catalog.py
+++ b/clients/client-python/gravitino/client/fileset_catalog.py
@@ -159,28 +159,14 @@ class FilesetCatalog(BaseSchemaCatalog,
SupportsCredentials):
Returns:
The created fileset metadata
"""
- # todo: call create_multiple_location_fileset if multiple storage
locations are supported
- self.check_fileset_name_identifier(ident)
-
- full_namespace = self._get_fileset_full_namespace(ident.namespace())
-
- req = FilesetCreateRequest(
- name=encode_string(ident.name()),
- comment=comment,
- fileset_type=fileset_type,
- storage_location=storage_location,
- properties=properties,
+ locations = (
+ {Fileset.LOCATION_NAME_UNKNOWN: storage_location}
+ if storage_location
+ else {}
)
-
- resp = self.rest_client.post(
- self.format_fileset_request_path(full_namespace),
- req,
- error_handler=FILESET_ERROR_HANDLER,
+ return self.create_multiple_location_fileset(
+ ident, comment, fileset_type, locations, properties
)
- fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
- fileset_resp.validate()
-
- return GenericFileset(fileset_resp.fileset(), self.rest_client,
full_namespace)
def create_multiple_location_fileset(
self,
@@ -206,7 +192,28 @@ class FilesetCatalog(BaseSchemaCatalog,
SupportsCredentials):
Returns:
The created fileset metadata
"""
- raise NotImplementedError("Multiple storage locations are not
supported yet")
+ self.check_fileset_name_identifier(ident)
+
+ full_namespace = self._get_fileset_full_namespace(ident.namespace())
+
+ req = FilesetCreateRequest(
+ name=encode_string(ident.name()),
+ comment=comment,
+ fileset_type=fileset_type,
+ storage_locations=storage_locations,
+ properties=properties,
+ )
+ req.validate()
+
+ resp = self.rest_client.post(
+ self.format_fileset_request_path(full_namespace),
+ req,
+ error_handler=FILESET_ERROR_HANDLER,
+ )
+ fileset_resp = FilesetResponse.from_json(resp.body, infer_missing=True)
+ fileset_resp.validate()
+
+ return GenericFileset(fileset_resp.fileset(), self.rest_client,
full_namespace)
def alter_fileset(self, ident: NameIdentifier, *changes) -> Fileset:
"""Update a fileset metadata in the catalog.
@@ -279,7 +286,8 @@ class FilesetCatalog(BaseSchemaCatalog,
SupportsCredentials):
Args:
ident: A fileset identifier, which should be "schema.fileset"
format.
sub_path: The sub path of the file or directory.
- location_name: The location name of the fileset.
+ location_name: The location name of the fileset, if not specified,
+ the default location will be used.
Raises:
NoSuchLocationNameException If the location name does not exist.
@@ -292,8 +300,11 @@ class FilesetCatalog(BaseSchemaCatalog,
SupportsCredentials):
full_namespace = self._get_fileset_full_namespace(ident.namespace())
try:
caller_context: CallerContext = CallerContextHolder.get()
- # todo: add location name to the request
- params = {"sub_path": encode_string(sub_path)}
+ params = {
+ "sub_path": encode_string(sub_path),
+ }
+ if location_name is not None:
+ params["location_name"] = encode_string(location_name)
resp = self.rest_client.get(
self.format_file_location_request_path(full_namespace,
ident.name()),
diff --git a/clients/client-python/gravitino/client/generic_fileset.py
b/clients/client-python/gravitino/client/generic_fileset.py
index c3b99dd497..cb3728c187 100644
--- a/clients/client-python/gravitino/client/generic_fileset.py
+++ b/clients/client-python/gravitino/client/generic_fileset.py
@@ -56,12 +56,8 @@ class GenericFileset(Fileset, SupportsCredentials):
def type(self) -> Fileset.Type:
return self._fileset.type()
- def storage_location(self) -> str:
- return self._fileset.storage_location()
-
def storage_locations(self) -> Dict[str, str]:
- # todo: implement this method and remove storage_location method
- pass
+ return self._fileset.storage_locations()
def comment(self) -> Optional[str]:
return self._fileset.comment()
diff --git a/clients/client-python/gravitino/dto/fileset_dto.py
b/clients/client-python/gravitino/dto/fileset_dto.py
index d098cd968b..894b519721 100644
--- a/clients/client-python/gravitino/dto/fileset_dto.py
+++ b/clients/client-python/gravitino/dto/fileset_dto.py
@@ -32,8 +32,8 @@ class FilesetDTO(Fileset, DataClassJsonMixin):
_comment: Optional[str] = field(metadata=config(field_name="comment"))
_type: Fileset.Type = field(metadata=config(field_name="type"))
_properties: Dict[str, str] =
field(metadata=config(field_name="properties"))
- _storage_location: str = field(
- default=None, metadata=config(field_name="storageLocation")
+ _storage_locations: Dict[str, str] = field(
+ metadata=config(field_name="storageLocations")
)
_audit: AuditDTO = field(default=None, metadata=config(field_name="audit"))
@@ -43,12 +43,8 @@ class FilesetDTO(Fileset, DataClassJsonMixin):
def type(self) -> Fileset.Type:
return self._type
- def storage_location(self) -> str:
- return self._storage_location
-
def storage_locations(self) -> Dict[str, str]:
- # todo: implement this method and remove storage_location method
- pass
+ return self._storage_locations
def comment(self) -> Optional[str]:
return self._comment
diff --git
a/clients/client-python/gravitino/dto/requests/fileset_create_request.py
b/clients/client-python/gravitino/dto/requests/fileset_create_request.py
index 980287ba0e..74b692f215 100644
--- a/clients/client-python/gravitino/dto/requests/fileset_create_request.py
+++ b/clients/client-python/gravitino/dto/requests/fileset_create_request.py
@@ -31,8 +31,8 @@ class FilesetCreateRequest(RESTRequest):
_name: str = field(metadata=config(field_name="name"))
_comment: Optional[str] = field(metadata=config(field_name="comment"))
_type: Optional[Fileset.Type] = field(metadata=config(field_name="type"))
- _storage_location: Optional[str] = field(
- metadata=config(field_name="storageLocation")
+ _storage_locations: Optional[Dict[str, str]] = field(
+ metadata=config(field_name="storageLocations")
)
_properties: Optional[Dict[str, str]] = field(
metadata=config(field_name="properties")
@@ -43,13 +43,13 @@ class FilesetCreateRequest(RESTRequest):
name: str,
comment: Optional[str] = None,
fileset_type: Fileset.Type = None,
- storage_location: Optional[str] = None,
+ storage_locations: Optional[Dict[str, str]] = None,
properties: Optional[Dict[str, str]] = None,
):
self._name = name
self._comment = comment
self._type = fileset_type
- self._storage_location = storage_location
+ self._storage_locations = storage_locations
self._properties = properties
def validate(self):
diff --git a/clients/client-python/gravitino/dto/responses/fileset_response.py
b/clients/client-python/gravitino/dto/responses/fileset_response.py
index faabe3d9ed..607c2966d0 100644
--- a/clients/client-python/gravitino/dto/responses/fileset_response.py
+++ b/clients/client-python/gravitino/dto/responses/fileset_response.py
@@ -44,9 +44,13 @@ class FilesetResponse(BaseResponse):
raise IllegalArgumentException("fileset must not be null")
if not self._fileset.name():
raise IllegalArgumentException("fileset 'name' must not be null
and empty")
- if not self._fileset.storage_location():
+ if not self._fileset.storage_locations():
raise IllegalArgumentException(
- "fileset 'storageLocation' must not be null and empty"
+ "fileset 'storageLocations' must not be null"
+ )
+ if not self._fileset.storage_locations():
+ raise IllegalArgumentException(
+ "fileset 'storageLocations' must not be empty. At least one
location is required."
)
if self._fileset.type() is None:
raise IllegalArgumentException("fileset 'type' must not be null
and empty")
diff --git
a/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
b/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
index 95baa798ce..569ce1832b 100644
---
a/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
+++
b/clients/client-python/gravitino/exceptions/handlers/fileset_error_handler.py
@@ -22,6 +22,7 @@ from gravitino.exceptions.base import (
NoSuchFilesetException,
NoSuchSchemaException,
CatalogNotInUseException,
+ NoSuchLocationNameException,
)
@@ -38,6 +39,8 @@ class FilesetErrorHandler(RestErrorHandler):
raise NoSuchSchemaException(error_message)
if exception_type == NoSuchFilesetException.__name__:
raise NoSuchFilesetException(error_message)
+ if exception_type == NoSuchLocationNameException.__name__:
+ raise NoSuchLocationNameException(error_message)
if code == ErrorConstants.NOT_IN_USE_CODE:
raise CatalogNotInUseException(error_message)
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py
b/clients/client-python/tests/integration/test_fileset_catalog.py
index 98a31fa0a2..11e83c0b20 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -17,7 +17,7 @@
import logging
from random import randint
-from typing import Dict, List
+from typing import Dict, List, Optional
from gravitino import (
NameIdentifier,
@@ -48,10 +48,12 @@ class TestFilesetCatalog(IntegrationTestEnv):
schema_name: str = "schema"
fileset_name: str = "fileset"
+ multiple_locations_fileset_name: str = "multiple_locations_fileset"
fileset_alter_name: str = fileset_name + "Alter"
fileset_comment: str = "fileset_comment"
fileset_location: str = "/tmp/TestFilesetCatalog"
+ fileset_location2: str = "/tmp/TestFilesetCatalog2"
fileset_properties_key1: str = "fileset_properties_key1"
fileset_properties_value1: str = "fileset_properties_value1"
fileset_properties_key2: str = "fileset_properties_key2"
@@ -60,6 +62,10 @@ class TestFilesetCatalog(IntegrationTestEnv):
fileset_properties_key1: fileset_properties_value1,
fileset_properties_key2: fileset_properties_value2,
}
+ multiple_locations_fileset_properties: Dict[str, str] = {
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME: "location1",
+ **fileset_properties,
+ }
fileset_new_name = fileset_name + "_new"
catalog_ident: NameIdentifier = NameIdentifier.of(metalake_name,
catalog_name)
@@ -67,6 +73,9 @@ class TestFilesetCatalog(IntegrationTestEnv):
metalake_name, catalog_name, schema_name
)
fileset_ident: NameIdentifier = NameIdentifier.of(schema_name,
fileset_name)
+ multiple_locations_fileset_ident: NameIdentifier = NameIdentifier.of(
+ schema_name, multiple_locations_fileset_name
+ )
fileset_new_ident: NameIdentifier = NameIdentifier.of(schema_name,
fileset_new_name)
gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
@@ -163,16 +172,55 @@ class TestFilesetCatalog(IntegrationTestEnv):
properties=self.fileset_properties,
)
+ def create_multiple_locations_fileset(self) -> Fileset:
+ catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
+ return catalog.as_fileset_catalog().create_multiple_location_fileset(
+ ident=self.multiple_locations_fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=self.fileset_comment,
+ storage_locations={
+ "location1": self.fileset_location,
+ "location2": self.fileset_location2,
+ },
+ properties=self.multiple_locations_fileset_properties,
+ )
+
def create_custom_fileset(
- self, ident: NameIdentifier, storage_location: str
+ self,
+ ident: NameIdentifier,
+ storage_location: Optional[str],
+ storage_locations: Optional[Dict[str, str]] = None,
+ default_location_name: Optional[str] = None,
) -> Fileset:
catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
- return catalog.as_fileset_catalog().create_fileset(
+ if storage_locations is None:
+ return catalog.as_fileset_catalog().create_fileset(
+ ident=ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=self.fileset_comment,
+ storage_location=storage_location,
+ properties=(
+ self.fileset_properties
+ if default_location_name is None
+ else {
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME:
default_location_name,
+ **self.fileset_properties,
+ }
+ ),
+ )
+ return catalog.as_fileset_catalog().create_multiple_location_fileset(
ident=ident,
fileset_type=Fileset.Type.MANAGED,
comment=self.fileset_comment,
- storage_location=storage_location,
- properties=self.fileset_properties,
+ storage_locations=storage_locations,
+ properties=(
+ self.fileset_properties
+ if default_location_name is None
+ else {
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME:
default_location_name,
+ **self.fileset_properties,
+ }
+ ),
)
def test_create_fileset(self):
@@ -189,6 +237,28 @@ class TestFilesetCatalog(IntegrationTestEnv):
)
self.assertEqual(fileset.storage_location(),
f"file:{self.fileset_location}")
+ def test_create_fileset_with_multiple_locations(self):
+ fileset = self.create_multiple_locations_fileset()
+ self.assertIsNotNone(fileset)
+ self.assertEqual(fileset.type(), Fileset.Type.MANAGED)
+ self.assertEqual(fileset.comment(), self.fileset_comment)
+ self.assertEqual(
+ fileset.properties(), self.multiple_locations_fileset_properties
+ )
+ self.assertEqual(
+ fileset.storage_location(),
+
f"file:/tmp/test1/{self.schema_name}/{self.multiple_locations_fileset_name}",
+ )
+ self.assertEqual(
+ fileset.storage_locations(),
+ {
+ Fileset.LOCATION_NAME_UNKNOWN: f"file:/tmp/test1/"
+ f"{self.schema_name}/{self.multiple_locations_fileset_name}",
+ "location1": f"file:{self.fileset_location}",
+ "location2": f"file:{self.fileset_location2}",
+ },
+ )
+
def test_drop_fileset(self):
self.create_fileset()
catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
@@ -271,6 +341,26 @@ class TestFilesetCatalog(IntegrationTestEnv):
self.assertEqual(actual_file_location,
f"file:{fileset_location}/test/test.txt")
+ # test get file location from multiple locations fileset
+ fileset_ident: NameIdentifier = NameIdentifier.of(
+ self.schema_name, "test_get_file_location_multiple_locations"
+ )
+ locations = {
+ "default": "/tmp/test_get_file_location",
+ "location1": "/tmp/test_get_file_location1",
+ "location2": "/tmp/test_get_file_location2",
+ }
+ self.create_custom_fileset(fileset_ident, None, locations, "location1")
+ actual_file_location = (
+ self.gravitino_client.load_catalog(name=self.catalog_name)
+ .as_fileset_catalog()
+ .get_file_location(fileset_ident, "/test/test.txt")
+ )
+
+ self.assertEqual(
+ actual_file_location,
f"file:{locations['location1']}/test/test.txt"
+ )
+
# test rename without sub path should throw an exception
caller_context = CallerContext(
{
diff --git a/clients/client-python/tests/unittests/mock_base.py
b/clients/client-python/tests/unittests/mock_base.py
index 2c7d6e3e58..8a33d96d48 100644
--- a/clients/client-python/tests/unittests/mock_base.py
+++ b/clients/client-python/tests/unittests/mock_base.py
@@ -94,8 +94,11 @@ def mock_load_fileset(name: str, location: str):
_name=name,
_type=Fileset.Type.MANAGED,
_comment="this is test",
- _properties={"k": "v"},
- _storage_location=location,
+ _properties={
+ "k": "v",
+ Fileset.PROPERTY_DEFAULT_LOCATION_NAME:
Fileset.LOCATION_NAME_UNKNOWN,
+ },
+ _storage_locations={Fileset.LOCATION_NAME_UNKNOWN: location},
_audit=audit_dto,
)
return fileset
diff --git
a/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
b/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
index 785f19204a..ae18ec3518 100644
---
a/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
+++
b/common/src/main/java/org/apache/gravitino/dto/responses/FilesetResponse.java
@@ -63,8 +63,11 @@ public class FilesetResponse extends BaseResponse {
Preconditions.checkArgument(
StringUtils.isNotBlank(fileset.name()), "fileset 'name' must not be
null and empty");
Preconditions.checkArgument(
- StringUtils.isNotBlank(fileset.storageLocation()),
- "fileset 'storageLocation' must not be null and empty");
- Preconditions.checkNotNull(fileset.type(), "fileset 'type' must not be
null and empty");
+ fileset.type() != null, "fileset 'type' must not be null and empty");
+ Preconditions.checkArgument(
+ fileset.storageLocations() != null, "fileset 'storageLocations' must
not be null");
+ Preconditions.checkArgument(
+ !fileset.storageLocations().isEmpty(),
+ "fileset 'storageLocations' must not be empty. At least one location
is required.");
}
}