This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new b7ceca005 [#4709] improvement(client-python): Add the implementation
for `getFileLocation` interface in python client (#5017)
b7ceca005 is described below
commit b7ceca005830bd0b098c2b036b18f8a48914a8f1
Author: xloya <[email protected]>
AuthorDate: Thu Sep 26 14:13:24 2024 +0800
[#4709] improvement(client-python): Add the implementation for
`getFileLocation` interface in python client (#5017)
### What changes were proposed in this pull request?
Add the implementations for `getFileLocation` interface in Python
Client.
### Why are the changes needed?
Fix: #4709
### How was this patch tested?
Add some UTs and ITs.
---
.../gravitino/audit/caller_context.py | 3 +-
.../gravitino/catalog/fileset_catalog.py | 34 ++++++++-
.../client-python/gravitino/utils/http_client.py | 9 ++-
.../tests/integration/test_fileset_catalog.py | 48 ++++++++++++-
.../tests/unittests/audit/test_caller_context.py | 3 +
.../tests/unittests/test_fileset_catalog_api.py | 80 ++++++++++++++++++++++
6 files changed, 172 insertions(+), 5 deletions(-)
diff --git a/clients/client-python/gravitino/audit/caller_context.py
b/clients/client-python/gravitino/audit/caller_context.py
index 07e30a4d3..0b57bb3b4 100644
--- a/clients/client-python/gravitino/audit/caller_context.py
+++ b/clients/client-python/gravitino/audit/caller_context.py
@@ -68,4 +68,5 @@ class CallerContextHolder:
@staticmethod
def remove():
"""Remove the CallerContext from the thread local."""
- del caller_context_holder.caller_context
+ if hasattr(caller_context_holder, "caller_context"):
+ del caller_context_holder.caller_context
diff --git a/clients/client-python/gravitino/catalog/fileset_catalog.py
b/clients/client-python/gravitino/catalog/fileset_catalog.py
index 3b2f0f717..cf91dbdfb 100644
--- a/clients/client-python/gravitino/catalog/fileset_catalog.py
+++ b/clients/client-python/gravitino/catalog/fileset_catalog.py
@@ -21,6 +21,7 @@ from typing import List, Dict
from gravitino.api.catalog import Catalog
from gravitino.api.fileset import Fileset
from gravitino.api.fileset_change import FilesetChange
+from gravitino.audit.caller_context import CallerContextHolder, CallerContext
from gravitino.catalog.base_schema_catalog import BaseSchemaCatalog
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.requests.fileset_create_request import FilesetCreateRequest
@@ -28,6 +29,7 @@ from gravitino.dto.requests.fileset_update_request import
FilesetUpdateRequest
from gravitino.dto.requests.fileset_updates_request import
FilesetUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
+from gravitino.dto.responses.file_location_response import FileLocationResponse
from gravitino.dto.responses.fileset_response import FilesetResponse
from gravitino.name_identifier import NameIdentifier
from gravitino.namespace import Namespace
@@ -244,7 +246,29 @@ class FilesetCatalog(BaseSchemaCatalog):
Returns:
The actual location of the file or directory.
"""
- raise NotImplementedError("Not implemented yet")
+ self.check_fileset_name_identifier(ident)
+
+ full_namespace = self._get_fileset_full_namespace(ident.namespace())
+ try:
+ caller_context: CallerContext = CallerContextHolder.get()
+ params = {"sub_path": encode_string(sub_path)}
+
+ resp = self.rest_client.get(
+ self.format_file_location_request_path(full_namespace,
ident.name()),
+ params=params,
+ headers=(
+ caller_context.context() if caller_context is not None
else None
+ ),
+ error_handler=FILESET_ERROR_HANDLER,
+ )
+ file_location_resp = FileLocationResponse.from_json(
+ resp.body, infer_missing=True
+ )
+ file_location_resp.validate()
+
+ return file_location_resp.file_location()
+ finally:
+ CallerContextHolder.remove()
@staticmethod
def check_fileset_namespace(namespace: Namespace):
@@ -272,6 +296,14 @@ class FilesetCatalog(BaseSchemaCatalog):
schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
return
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}/filesets"
+ @staticmethod
+ def format_file_location_request_path(namespace: Namespace, name: str) ->
str:
+ schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
+ return (
+
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}"
+ f"/filesets/{encode_string(name)}/location"
+ )
+
@staticmethod
def to_fileset_update_request(change: FilesetChange):
if isinstance(change, FilesetChange.RenameFileset):
diff --git a/clients/client-python/gravitino/utils/http_client.py
b/clients/client-python/gravitino/utils/http_client.py
index 696fe415c..262c73c2b 100644
--- a/clients/client-python/gravitino/utils/http_client.py
+++ b/clients/client-python/gravitino/utils/http_client.py
@@ -217,9 +217,14 @@ class HTTPClient:
f"Error handler {type(error_handler).__name__} can't handle this
response, error response body: {resp}"
) from None
- def get(self, endpoint, params=None, error_handler=None, **kwargs):
+ def get(self, endpoint, params=None, headers=None, error_handler=None,
**kwargs):
return self._request(
- "get", endpoint, params=params, error_handler=error_handler,
**kwargs
+ "get",
+ endpoint,
+ params=params,
+ headers=headers,
+ error_handler=error_handler,
+ **kwargs,
)
def delete(self, endpoint, error_handler=None, **kwargs):
diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py
b/clients/client-python/tests/integration/test_fileset_catalog.py
index 6ea1831b9..0e92ec1b0 100644
--- a/clients/client-python/tests/integration/test_fileset_catalog.py
+++ b/clients/client-python/tests/integration/test_fileset_catalog.py
@@ -27,7 +27,13 @@ from gravitino import (
Fileset,
FilesetChange,
)
-from gravitino.exceptions.base import NoSuchFilesetException,
GravitinoRuntimeException
+from gravitino.audit.caller_context import CallerContext, CallerContextHolder
+from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
+from gravitino.audit.fileset_data_operation import FilesetDataOperation
+from gravitino.exceptions.base import (
+ NoSuchFilesetException,
+ GravitinoRuntimeException,
+)
from tests.integration.integration_test_env import IntegrationTestEnv
logger = logging.getLogger(__name__)
@@ -155,6 +161,18 @@ class TestFilesetCatalog(IntegrationTestEnv):
properties=self.fileset_properties,
)
+ def create_custom_fileset(
+ self, ident: NameIdentifier, storage_location: str
+ ) -> Fileset:
+ catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
+ return catalog.as_fileset_catalog().create_fileset(
+ ident=ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=self.fileset_comment,
+ storage_location=storage_location,
+ properties=self.fileset_properties,
+ )
+
def test_create_fileset(self):
fileset = self.create_fileset()
self.assertIsNotNone(fileset)
@@ -223,3 +241,31 @@ class TestFilesetCatalog(IntegrationTestEnv):
)
self.assertEqual(fileset_comment_removed.name(), self.fileset_name)
self.assertIsNone(fileset_comment_removed.comment())
+
+ def test_get_file_location(self):
+ fileset_ident: NameIdentifier = NameIdentifier.of(
+ self.schema_name, "test_get_file_location"
+ )
+ fileset_location = "/tmp/test_get_file_location"
+ self.create_custom_fileset(fileset_ident, fileset_location)
+ actual_file_location = (
+ self.gravitino_client.load_catalog(name=self.catalog_name)
+ .as_fileset_catalog()
+ .get_file_location(fileset_ident, "/test/test.txt")
+ )
+
+ self.assertEqual(actual_file_location,
f"file:{fileset_location}/test/test.txt")
+
+ # test rename without sub path should throw an exception
+ caller_context = CallerContext(
+ {
+ FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION:
FilesetDataOperation.RENAME.name
+ }
+ )
+ with self.assertRaises(GravitinoRuntimeException):
+ CallerContextHolder.set(caller_context)
+ (
+ self.gravitino_client.load_catalog(name=self.catalog_name)
+ .as_fileset_catalog()
+ .get_file_location(fileset_ident, "")
+ )
diff --git a/clients/client-python/tests/unittests/audit/test_caller_context.py
b/clients/client-python/tests/unittests/audit/test_caller_context.py
index 93031a256..4243d2d5c 100644
--- a/clients/client-python/tests/unittests/audit/test_caller_context.py
+++ b/clients/client-python/tests/unittests/audit/test_caller_context.py
@@ -62,3 +62,6 @@ class TestCallerContext(unittest.TestCase):
CallerContextHolder.remove()
self.assertIsNone(CallerContextHolder.get())
+
+ # will not throw an exception if the context is not exists
+ CallerContextHolder.remove()
diff --git a/clients/client-python/tests/unittests/test_fileset_catalog_api.py
b/clients/client-python/tests/unittests/test_fileset_catalog_api.py
new file mode 100644
index 000000000..06c27d3df
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_fileset_catalog_api.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+from http.client import HTTPResponse
+from unittest.mock import patch, Mock
+
+from gravitino import GravitinoClient, Catalog, NameIdentifier
+from gravitino.audit.caller_context import CallerContext, CallerContextHolder
+from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
+from gravitino.audit.fileset_data_operation import FilesetDataOperation
+from gravitino.exceptions.handlers.fileset_error_handler import
FILESET_ERROR_HANDLER
+from gravitino.namespace import Namespace
+from gravitino.utils import Response
+from tests.unittests import mock_base
+
+
+@mock_base.mock_data
+class TestFilesetCatalogApi(unittest.TestCase):
+
+ def test_get_file_location(self, *mock_method):
+ json_data = {"code": 0, "fileLocation": "file:/test/1"}
+ json_str = json.dumps(json_data)
+
+ mock_http_resp = Mock(HTTPResponse)
+ mock_http_resp.getcode.return_value = 200
+ mock_http_resp.read.return_value = json_str
+ mock_http_resp.info.return_value = None
+ mock_http_resp.url = None
+ mock_resp = Response(mock_http_resp)
+
+ metalake_name: str = "metalake_demo"
+ catalog_name: str = "fileset_catalog"
+ gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=metalake_name
+ )
+ catalog: Catalog = gravitino_client.load_catalog(catalog_name)
+
+ with patch(
+ "gravitino.utils.http_client.HTTPClient.get",
+ return_value=mock_resp,
+ ) as mock_get:
+ fileset_ident: NameIdentifier = NameIdentifier.of(
+ "test", "test_get_file_location"
+ )
+ context = {
+ FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION:
FilesetDataOperation.RENAME.name
+ }
+ CallerContextHolder.set(CallerContext(context))
+ file_location: str =
catalog.as_fileset_catalog().get_file_location(
+ fileset_ident, "/test/1"
+ )
+ # check the get input params as expected
+ mock_get.assert_called_once_with(
+ catalog.as_fileset_catalog().format_file_location_request_path(
+ Namespace.of("metalake_demo", "fileset_catalog", "test"),
+ fileset_ident.name(),
+ ),
+ params={"sub_path": "/test/1"},
+ headers=context,
+ error_handler=FILESET_ERROR_HANDLER,
+ )
+ # check the caller context is removed
+ self.assertIsNone(CallerContextHolder.get())
+ # check the response is as expected
+ self.assertEqual(file_location, "file:/test/1")