This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ea6ae5510 [#4586] fix(filesystem): Fix convert the actual path which
storage location end with slash (#4592)
ea6ae5510 is described below
commit ea6ae5510437eff17061d0eefbd381930d49ccea
Author: xloya <[email protected]>
AuthorDate: Fri Aug 23 12:48:42 2024 +0800
[#4586] fix(filesystem): Fix convert the actual path which storage location
end with slash (#4592)
### What changes were proposed in this pull request?
When the storage location of fileset has a trailing slash, the problem
of not finding files may occur when traversing the directory using like
the hadoop command: `hadoop dfs -ls -R
gvfs://fileset/catalog/tmp/test/`. This PR detects the trailing slash
when converting to a virtual path and aligns the prefix to be replaced.
### Why are the changes needed?
Fix: #4586
### How was this patch tested?
Add some UTs and test shell command locally.

---------
Co-authored-by: xiaojiebao <[email protected]>
---
clients/client-python/gravitino/filesystem/gvfs.py | 25 ++++
.../tests/unittests/test_gvfs_with_local.py | 147 ++++++++++++++++++++-
.../hadoop/GravitinoVirtualFileSystem.java | 41 +++++-
.../gravitino/filesystem/hadoop/TestGvfsBase.java | 92 +++++++++++++
4 files changed, 298 insertions(+), 7 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index a2b2461b3..34352bb1c 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -93,6 +93,7 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
# Override the parent variable
protocol = PROTOCOL_NAME
_identifier_pattern =
re.compile("^fileset/([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")
+ SLASH = "/"
def __init__(
self,
@@ -500,6 +501,12 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
f"Path {path} does not start with valid prefix
{actual_prefix}."
)
virtual_location =
self._get_virtual_location(context.get_name_identifier())
+ # if the storage location is end with "/",
+ # we should truncate this to avoid replace issues.
+ if actual_prefix.endswith(self.SLASH) and not
virtual_location.endswith(
+ self.SLASH
+ ):
+ return f"{path.replace(actual_prefix[:-1], virtual_location)}"
return f"{path.replace(actual_prefix, virtual_location)}"
def _convert_actual_info(self, entry: Dict, context: FilesetContext):
@@ -645,6 +652,24 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
" when the fileset only mounts a single file."
)
return storage_location
+ # if the storage location ends with "/",
+ # we should handle the conversion specially
+ if storage_location.endswith(self.SLASH):
+ sub_path = virtual_path[len(virtual_location) :]
+ # For example, if the virtual path is
`gvfs://fileset/catalog/schema/test_fileset/ttt`,
+ # and the storage location is `hdfs://cluster:8020/user/`,
+ # we should replace `gvfs://fileset/catalog/schema/test_fileset`
+ # with `hdfs://localhost:8020/user` which truncates the tailing
slash.
+ # If the storage location is `hdfs://cluster:8020/user`,
+ # we can replace `gvfs://fileset/catalog/schema/test_fileset`
+ # with `hdfs://localhost:8020/user` directly.
+ if sub_path.startswith(self.SLASH):
+ new_storage_location = storage_location[:-1]
+ else:
+ new_storage_location = storage_location
+
+ # Replace virtual_location with the adjusted storage_location
+ return virtual_path.replace(virtual_location,
new_storage_location, 1)
return virtual_path.replace(virtual_location, storage_location, 1)
@staticmethod
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py
b/clients/client-python/tests/unittests/test_gvfs_with_local.py
index a9a4afb5b..1dcffa970 100644
--- a/clients/client-python/tests/unittests/test_gvfs_with_local.py
+++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py
@@ -17,7 +17,7 @@ specific language governing permissions and limitations
under the License.
"""
-# pylint: disable=protected-access,too-many-lines
+# pylint: disable=protected-access,too-many-lines,too-many-locals
import base64
import os
@@ -25,6 +25,7 @@ import random
import string
import time
import unittest
+from unittest import mock
from unittest.mock import patch
import pandas
@@ -34,7 +35,7 @@ import pyarrow.parquet as pq
from fsspec.implementations.local import LocalFileSystem
from llama_index.core import SimpleDirectoryReader
-from gravitino import gvfs
+from gravitino import gvfs, Fileset
from gravitino import NameIdentifier
from gravitino.auth.auth_constants import AuthConstants
from gravitino.dto.audit_dto import AuditDTO
@@ -796,6 +797,44 @@ class TestLocalFilesystem(unittest.TestCase):
"fileset/test_catalog/test_schema/test_f1/actual_path",
virtual_path
)
+ # test storage location without "/"
+ actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet"
+ storage_location1 = "file:/tmp/test_convert_actual_path"
+ mock_fileset1: Fileset = mock.Mock(spec=Fileset)
+ mock_fileset1.storage_location.return_value = storage_location1
+
+ mock_fileset_context1: FilesetContext = mock.Mock(spec=FilesetContext)
+ mock_fileset_context1.get_storage_type.return_value = StorageType.LOCAL
+ mock_fileset_context1.get_name_identifier.return_value =
NameIdentifier.of(
+ "test_metalake", "catalog", "schema", "test_convert_actual_path"
+ )
+ mock_fileset_context1.get_fileset.return_value = mock_fileset1
+
+ virtual_path = fs._convert_actual_path(actual_path,
mock_fileset_context1)
+ self.assertEqual(
+
"fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet",
+ virtual_path,
+ )
+
+ # test storage location with "/"
+ actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet"
+ storage_location2 = "file:/tmp/test_convert_actual_path/"
+ mock_fileset2: Fileset = mock.Mock(spec=Fileset)
+ mock_fileset2.storage_location.return_value = storage_location2
+
+ mock_fileset_context2: FilesetContext = mock.Mock(spec=FilesetContext)
+ mock_fileset_context2.get_storage_type.return_value = StorageType.LOCAL
+ mock_fileset_context2.get_name_identifier.return_value =
NameIdentifier.of(
+ "test_metalake", "catalog", "schema", "test_convert_actual_path"
+ )
+ mock_fileset_context2.get_fileset.return_value = mock_fileset2
+
+ virtual_path = fs._convert_actual_path(actual_path,
mock_fileset_context2)
+ self.assertEqual(
+
"fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet",
+ virtual_path,
+ )
+
def test_convert_info(self, *mock_methods3):
# test convert actual hdfs path
audit_dto = AuditDTO(
@@ -1031,3 +1070,107 @@ class TestLocalFilesystem(unittest.TestCase):
self.assertEqual(row[1], "19")
elif row[0] == "D":
self.assertEqual(row[1], "18")
+
+ @patch(
+ "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset",
+ return_value=mock_base.mock_load_fileset(
+ "test_location_with_tailing_slash",
+ f"{_fileset_dir}/test_location_with_tailing_slash/",
+ ),
+ )
+ def test_location_with_tailing_slash(self, *mock_methods):
+ local_fs = LocalFileSystem()
+ # storage location is ending with a "/"
+ fileset_storage_location = (
+ f"{self._fileset_dir}/test_location_with_tailing_slash/"
+ )
+ fileset_virtual_location = (
+ "fileset/fileset_catalog/tmp/test_location_with_tailing_slash"
+ )
+ local_fs.mkdir(fileset_storage_location)
+ sub_dir_path = f"{fileset_storage_location}test_1"
+ local_fs.mkdir(sub_dir_path)
+ self.assertTrue(local_fs.exists(sub_dir_path))
+ sub_file_path = f"{sub_dir_path}/test_file_1.par"
+ local_fs.touch(sub_file_path)
+ self.assertTrue(local_fs.exists(sub_file_path))
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ )
+ self.assertTrue(fs.exists(fileset_virtual_location))
+
+ dir_virtual_path = fileset_virtual_location + "/test_1"
+ dir_info = fs.info(dir_virtual_path)
+ self.assertEqual(dir_info["name"], dir_virtual_path)
+
+ file_virtual_path = fileset_virtual_location +
"/test_1/test_file_1.par"
+ file_info = fs.info(file_virtual_path)
+ self.assertEqual(file_info["name"], file_virtual_path)
+
+ file_status = fs.ls(fileset_virtual_location, detail=True)
+ for status in file_status:
+ if status["name"].endswith("test_1"):
+ self.assertEqual(status["name"], dir_virtual_path)
+ elif status["name"].endswith("test_file_1.par"):
+ self.assertEqual(status["name"], file_virtual_path)
+ else:
+ raise GravitinoRuntimeException("Unexpected file found")
+
+ def test_get_actual_path_by_ident(self, *mock_methods):
+ ident1 = NameIdentifier.of(
+ "test_metalake", "catalog", "schema",
"test_get_actual_path_by_ident"
+ )
+ storage_type = gvfs.StorageType.LOCAL
+ local_fs = LocalFileSystem()
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090", metalake_name="metalake_demo"
+ )
+
+ # test storage location end with "/"
+ storage_location_1 =
f"{self._fileset_dir}/test_get_actual_path_by_ident/"
+ # virtual path end with "/"
+ virtual_path1 = "fileset/catalog/schema/test_get_actual_path_by_ident/"
+ local_fs.mkdir(storage_location_1)
+ self.assertTrue(local_fs.exists(storage_location_1))
+
+ mock_fileset1: Fileset = mock.Mock(spec=Fileset)
+ mock_fileset1.storage_location.return_value = storage_location_1
+
+ actual_path1 = fs._get_actual_path_by_ident(
+ ident1, mock_fileset1, local_fs, storage_type, virtual_path1
+ )
+ self.assertEqual(actual_path1, storage_location_1)
+
+ # virtual path end without "/"
+ virtual_path2 = "fileset/catalog/schema/test_get_actual_path_by_ident"
+ actual_path2 = fs._get_actual_path_by_ident(
+ ident1, mock_fileset1, local_fs, storage_type, virtual_path2
+ )
+ self.assertEqual(actual_path2, storage_location_1)
+
+ # test storage location end without "/"
+ ident2 = NameIdentifier.of(
+ "test_metalake", "catalog", "schema", "test_without_slash"
+ )
+ storage_location_2 = f"{self._fileset_dir}/test_without_slash"
+ # virtual path end with "/"
+ virtual_path3 = "fileset/catalog/schema/test_without_slash/"
+ local_fs.mkdir(storage_location_2)
+ self.assertTrue(local_fs.exists(storage_location_2))
+
+ mock_fileset2: Fileset = mock.Mock(spec=Fileset)
+ mock_fileset2.storage_location.return_value = storage_location_2
+
+ actual_path3 = fs._get_actual_path_by_ident(
+ ident2, mock_fileset2, local_fs, storage_type, virtual_path3
+ )
+ self.assertEqual(actual_path3, f"{storage_location_2}/")
+
+ # virtual path end without "/"
+ virtual_path4 = "fileset/catalog/schema/test_without_slash"
+ actual_path4 = fs._get_actual_path_by_ident(
+ ident2, mock_fileset2, local_fs, storage_type, virtual_path4
+ )
+ self.assertEqual(actual_path4, storage_location_2)
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index bbcf0c71e..59ef5091c 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -74,6 +74,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
// /fileset_catalog/fileset_schema/fileset1/sub_dir/
private static final Pattern IDENTIFIER_PATTERN =
Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$");
+ private static final String SLASH = "/";
@Override
public void initialize(URI name, Configuration configuration) throws
IOException {
@@ -277,7 +278,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
identifier.name());
}
- private Path getActualPathByIdentifier(
+ @VisibleForTesting
+ Path getActualPathByIdentifier(
NameIdentifier identifier, Pair<Fileset, FileSystem> filesetPair, Path
path) {
String virtualPath = path.toString();
boolean withScheme =
@@ -294,7 +296,27 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
return new Path(storageLocation);
} else {
- return new Path(virtualPath.replaceFirst(virtualLocation,
storageLocation));
+ // if the storage location ends with "/",
+ // we should handle the conversion specially
+ if (storageLocation.endsWith(SLASH)) {
+ String subPath = virtualPath.substring(virtualLocation.length());
+ // For example, if the virtual path is
`gvfs://fileset/catalog/schema/test_fileset/ttt`,
+ // and the storage location is `hdfs://cluster:8020/user/`,
+ // we should replace `gvfs://fileset/catalog/schema/test_fileset`
with
+ // `hdfs://localhost:8020/user` which truncates the tailing slash.
+ // If the storage location is `hdfs://cluster:8020/user`,
+ // we can replace `gvfs://fileset/catalog/schema/test_fileset` with
+ // `hdfs://localhost:8020/user` directly.
+ if (subPath.startsWith(SLASH)) {
+ return new Path(
+ virtualPath.replaceFirst(
+ virtualLocation, storageLocation.substring(0,
storageLocation.length() - 1)));
+ } else {
+ return new Path(virtualPath.replaceFirst(virtualLocation,
storageLocation));
+ }
+ } else {
+ return new Path(virtualPath.replaceFirst(virtualLocation,
storageLocation));
+ }
}
} catch (Exception e) {
throw new RuntimeException(
@@ -320,7 +342,8 @@ public class GravitinoVirtualFileSystem extends FileSystem {
}
}
- private FileStatus convertFileStatusPathPrefix(
+ @VisibleForTesting
+ FileStatus convertFileStatusPathPrefix(
FileStatus fileStatus, String actualPrefix, String virtualPrefix) {
String filePath = fileStatus.getPath().toString();
Preconditions.checkArgument(
@@ -328,7 +351,15 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
"Path %s doesn't start with prefix \"%s\".",
filePath,
actualPrefix);
- Path path = new Path(filePath.replaceFirst(actualPrefix, virtualPrefix));
+ // if the storage location is end with "/",
+ // we should truncate this to avoid replace issues.
+ Path path =
+ new Path(
+ filePath.replaceFirst(
+ actualPrefix.endsWith(SLASH) && !virtualPrefix.endsWith(SLASH)
+ ? actualPrefix.substring(0, actualPrefix.length() - 1)
+ : actualPrefix,
+ virtualPrefix));
fileStatus.setPath(path);
return fileStatus;
@@ -491,7 +522,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
FileStatus fileStatus =
context.getFileSystem().getFileStatus(context.getActualPath());
return convertFileStatusPathPrefix(
fileStatus,
- context.getFileset().storageLocation(),
+ new Path(context.getFileset().storageLocation()).toString(),
getVirtualLocation(context.getIdentifier(), true));
}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index 448d42f81..0c19b563b 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -24,6 +24,7 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import java.io.File;
import java.io.IOException;
@@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.shaded.org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -52,6 +54,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
public class TestGvfsBase extends GravitinoMockServerBase {
protected static final String GVFS_IMPL_CLASS =
GravitinoVirtualFileSystem.class.getName();
@@ -446,6 +449,27 @@ public class TestGvfsBase extends GravitinoMockServerBase {
.replaceFirst(
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
FileSystemTestUtils.localRootPrefix()));
+
+ // test get file status with storageLocation end with "/"
+ String fName = "test_location_with_slash";
+ String tmpPathString =
+ String.format(
+ "%s/%s/%s/%s",
+ FileSystemTestUtils.localRootPrefix(), catalogName, schemaName,
fName + "/");
+ Path tempPath = new Path(tmpPathString);
+
+ mockFilesetDTO(
+ metalakeName, catalogName, schemaName, fName, Fileset.Type.MANAGED,
tmpPathString);
+ Path fPath = FileSystemTestUtils.createFilesetPath(catalogName,
schemaName, fName, true);
+ localFileSystem.mkdirs(tempPath);
+ assertTrue(localFileSystem.exists(tempPath));
+ Path subDir = new Path(tempPath + "/sub_dir");
+ localFileSystem.mkdirs(subDir);
+ assertTrue(localFileSystem.exists(tempPath));
+
+ Path subDirVirtualPath = new Path(fPath + "/sub_dir");
+ FileStatus subDirStatus =
gravitinoFileSystem.getFileStatus(subDirVirtualPath);
+ assertEquals(fPath + "/sub_dir", subDirStatus.getPath().toString());
}
}
@@ -490,6 +514,26 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileSystemTestUtils.localRootPrefix()));
gravitinoFileSystem.delete(gravitinoStatuses.get(i).getPath(), true);
}
+
+ // test list file status with storageLocation end with "/"
+ String fName = "test_list_location_with_slash";
+ String tmpPathString =
+ String.format(
+ "%s/%s/%s/%s",
+ FileSystemTestUtils.localRootPrefix(), catalogName, schemaName,
fName + "/");
+ Path tempPath = new Path(tmpPathString);
+
+ mockFilesetDTO(
+ metalakeName, catalogName, schemaName, fName, Fileset.Type.MANAGED,
tmpPathString);
+ Path fPath = FileSystemTestUtils.createFilesetPath(catalogName,
schemaName, fName, true);
+ localFileSystem.mkdirs(tempPath);
+ assertTrue(localFileSystem.exists(tempPath));
+ Path subDir = new Path(tempPath + "/sub_dir");
+ localFileSystem.mkdirs(subDir);
+ assertTrue(localFileSystem.exists(tempPath));
+
+ FileStatus[] subDirStatus = gravitinoFileSystem.listStatus(fPath);
+ assertEquals(fPath + "/sub_dir", subDirStatus[0].getPath().toString());
}
}
@@ -607,4 +651,52 @@ public class TestGvfsBase extends GravitinoMockServerBase {
assertEquals(32 * 1024 * 1024,
fs.getDefaultBlockSize(managedFilesetPath));
}
}
+
+ @Test
+ public void testConvertFileStatusPathPrefix() throws IOException {
+ try (GravitinoVirtualFileSystem fs =
+ (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
+ FileStatus fileStatus =
+ new FileStatus(1024, false, 1, 32 * 1024 * 1024, 1024, new
Path("hdfs://hive:9000/test"));
+ // storage location end with "/"
+ String storageLocation = "hdfs://hive:9000/";
+ String virtualLocation = "gvfs://fileset/test_catalog/tmp/test_fileset";
+ FileStatus convertedStatus =
+ fs.convertFileStatusPathPrefix(fileStatus, storageLocation,
virtualLocation);
+ Path expectedPath = new
Path("gvfs://fileset/test_catalog/tmp/test_fileset/test");
+ assertEquals(expectedPath, convertedStatus.getPath());
+ }
+ }
+
+ @Test
+ public void testGetActualPathByIdentifier() throws IOException {
+ try (GravitinoVirtualFileSystem fs =
+ (GravitinoVirtualFileSystem) managedFilesetPath.getFileSystem(conf)) {
+ // test storage location end with "/"
+ NameIdentifier ident1 =
+ NameIdentifier.of("test_metalake", "catalog", "schema",
"testGetActualPath");
+ Fileset mockFileset1 = Mockito.mock(Fileset.class);
+
Mockito.when(mockFileset1.storageLocation()).thenReturn("file:/tmp/test/123/");
+ FileSystem mockFs1 = Mockito.mock(FileSystem.class);
+ FileStatus mockFileStatus1 = Mockito.mock(FileStatus.class);
+ Mockito.when(mockFileStatus1.isFile()).thenReturn(false);
+ Mockito.when(mockFs1.getFileStatus(any())).thenReturn(mockFileStatus1);
+ // test virtual path sub dir with "/"
+ Path virtualPath1 =
+ new
Path("gvfs://fileset/catalog/schema/testGetActualPath/sub_dir/1.parquet");
+ Path actualPath1 =
+ fs.getActualPathByIdentifier(ident1, Pair.of(mockFileset1, mockFs1),
virtualPath1);
+ assertEquals(
+ String.format("%ssub_dir/1.parquet", mockFileset1.storageLocation()),
+ actualPath1.toString());
+
+ // test virtual path sub dir without "/"
+ Path virtualPath2 = new
Path("gvfs://fileset/catalog/schema/testGetActualPath");
+ Path actualPath2 =
+ fs.getActualPathByIdentifier(ident1, Pair.of(mockFileset1, mockFs1),
virtualPath2);
+ assertEquals(
+ mockFileset1.storageLocation().substring(0,
mockFileset1.storageLocation().length() - 1),
+ actualPath2.toString());
+ }
+ }
}