This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 7614360c5d [#6860] feat(fileset): add file viewer support for filesets 
(#7329)
7614360c5d is described below

commit 7614360c5d29528b433bc90fb1df3d0d2fd81c44
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 4 15:07:09 2025 +0800

    [#6860] feat(fileset): add file viewer support for filesets (#7329)
    
    ### What changes were proposed in this pull request?
    
    Enables web pages to view the directories and files in a `Fileset`.
    
    ### Why are the changes needed?
    
    Fix: #6860
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, new REST API
    
    ### How was this patch tested?
    
    tests added
    
    Co-authored-by: Yunchi Pang <[email protected]>
---
 .../catalog/hadoop/HadoopCatalogOperations.java    |  62 +++++-
 .../hadoop/SecureHadoopCatalogOperations.java      |  14 +-
 .../hadoop/TestHadoopCatalogOperations.java        | 107 +++++++++-
 .../hadoop/integration/test/HadoopCatalogIT.java   | 233 +++++++++++++++++++++
 .../org/apache/gravitino/dto/file/FileInfoDTO.java | 171 +++++++++++++++
 .../dto/responses/FileInfoListResponse.java        |  63 ++++++
 .../apache/gravitino/dto/util/DTOConverters.java   |  22 ++
 .../java/org/apache/gravitino/file/FileInfo.java   |  32 ++-
 .../apache/gravitino/catalog/CatalogManager.java   |  15 ++
 .../gravitino/catalog/FilesetDispatcher.java       |   8 +-
 .../apache/gravitino/catalog/FilesetFileOps.java   |  47 +++++
 .../catalog/FilesetNormalizeDispatcher.java        |   8 +
 .../catalog/FilesetOperationDispatcher.java        |  15 ++
 .../gravitino/hook/FilesetHookDispatcher.java      |   8 +
 .../gravitino/listener/FilesetEventDispatcher.java |   9 +
 .../server/web/rest/FilesetOperations.java         |  57 +++++
 16 files changed, 853 insertions(+), 18 deletions(-)

diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
index 0aceee05e5..c6b98791c1 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java
@@ -39,6 +39,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,12 +66,14 @@ import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.FilesetFileOps;
 import org.apache.gravitino.catalog.ManagedSchemaOperations;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.dto.file.FileInfoDTO;
 import org.apache.gravitino.exceptions.AlreadyExistsException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
@@ -81,6 +84,7 @@ import 
org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetCatalog;
 import org.apache.gravitino.file.FilesetChange;
@@ -99,7 +103,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HadoopCatalogOperations extends ManagedSchemaOperations
-    implements CatalogOperations, FilesetCatalog {
+    implements CatalogOperations, FilesetCatalog, FilesetFileOps {
   private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not 
exist";
   private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does 
not exist";
   private static final String SLASH = "/";
@@ -237,6 +241,46 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
         });
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier filesetIdent, String 
locationName, String subPath)
+      throws NoSuchFilesetException, IOException {
+    if (disableFSOps) {
+      LOG.warn("Filesystem operations disabled, rejecting listFiles for {}", 
filesetIdent);
+      throw new UnsupportedOperationException("Filesystem operations are 
disabled on this server");
+    }
+
+    String actualPath = getFileLocation(filesetIdent, subPath, locationName);
+    Path formalizedPath = formalizePath(new Path(actualPath), conf);
+
+    FileSystem fs = getFileSystem(formalizedPath, conf);
+    if (!fs.exists(formalizedPath)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Path %s does not exist in fileset %s", 
formalizedPath.toString(), filesetIdent));
+    }
+
+    String catalogName = filesetIdent.namespace().level(1);
+    String schemaName = filesetIdent.namespace().level(2);
+    String filesetName = filesetIdent.name();
+
+    try {
+      return Arrays.stream(fs.listStatus(formalizedPath))
+          .map(
+              status ->
+                  FileInfoDTO.builder()
+                      .name(status.getPath().getName())
+                      .isDir(status.isDirectory())
+                      .size(status.isDirectory() ? 0L : status.getLen())
+                      .lastModified(status.getModificationTime())
+                      .path(buildGVFSFilePath(catalogName, schemaName, 
filesetName, subPath))
+                      .build())
+          .toArray(FileInfo[]::new);
+
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list files in fileset" + 
filesetIdent, e);
+    }
+  }
+
   @Override
   public Fileset createMultipleLocationFileset(
       NameIdentifier ident,
@@ -1049,6 +1093,10 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     return path.endsWith(SLASH) ? path.substring(0, path.length() - 1) : path;
   }
 
+  private String ensureLeadingSlash(String path) {
+    return path.startsWith(SLASH) ? path : SLASH + path;
+  }
+
   private FilesetEntity updateFilesetEntity(
       NameIdentifier ident, FilesetEntity filesetEntity, FilesetChange... 
changes) {
     Map<String, String> props =
@@ -1144,13 +1192,13 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
           Path schemaPath = schemaPaths.get(locationName);
           filesetPaths.put(
               locationName,
-              caculateFilesetPath(
+              calculateFilesetPath(
                   schemaName, filesetName, storageLocation, schemaPath, 
properties));
         });
     return filesetPaths.build();
   }
 
-  private Path caculateFilesetPath(
+  private Path calculateFilesetPath(
       String schemaName,
       String filesetName,
       String storageLocation,
@@ -1248,6 +1296,14 @@ public class HadoopCatalogOperations extends 
ManagedSchemaOperations
     }
   }
 
+  private String buildGVFSFilePath(
+      String catalogName, String schemaName, String filesetName, String 
subPath) {
+    String prefix = String.join(SLASH, "/fileset", catalogName, schemaName, 
filesetName);
+    return StringUtils.isBlank(subPath)
+        ? prefix
+        : prefix + ensureLeadingSlash(removeTrailingSlash(subPath));
+  }
+
   FileSystem getFileSystem(Path path, Map<String, String> config) throws 
IOException {
     if (path == null) {
       throw new IllegalArgumentException("Path should not be null");
diff --git 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
index 4427ff2483..1fa3ef60d7 100644
--- 
a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java
@@ -41,6 +41,7 @@ import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.audit.CallerContext;
+import org.apache.gravitino.catalog.FilesetFileOps;
 import org.apache.gravitino.catalog.hadoop.authentication.UserContext;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.CatalogOperations;
@@ -58,6 +59,7 @@ import 
org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetCatalog;
 import org.apache.gravitino.file.FilesetChange;
@@ -70,7 +72,11 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("removal")
 public class SecureHadoopCatalogOperations
-    implements CatalogOperations, SupportsSchemas, FilesetCatalog, 
SupportsPathBasedCredentials {
+    implements CatalogOperations,
+        SupportsSchemas,
+        FilesetCatalog,
+        FilesetFileOps,
+        SupportsPathBasedCredentials {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(SecureHadoopCatalogOperations.class);
 
@@ -232,6 +238,12 @@ public class SecureHadoopCatalogOperations
     return hadoopCatalogOperations.listFilesets(namespace);
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException, IOException {
+    return hadoopCatalogOperations.listFiles(ident, locationName, subPath);
+  }
+
   @Override
   public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
     return hadoopCatalogOperations.loadFileset(ident);
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
index 25c523ea70..41f7b2aee9 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java
@@ -79,6 +79,7 @@ import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.storage.IdGenerator;
@@ -241,7 +242,7 @@ public class TestHadoopCatalogOperations {
         .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
             Mockito.anyString(), Mockito.anyString(), Mockito.eq("schema11"));
 
-    for (int i = 10; i < 30; i++) {
+    for (int i = 10; i < 33; i++) {
       doReturn(new SchemaIds(1L, 1L, (long) i))
           .when(spySchemaMetaService)
           .getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
@@ -899,6 +900,110 @@ public class TestHadoopCatalogOperations {
     }
   }
 
+  @Test
+  public void testListFilesetFiles() throws IOException {
+    String schemaName = "schema30";
+    String comment = "comment30";
+    String filesetName = "fileset30";
+    String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
+
+    createSchema(schemaName, comment, null, schemaPath);
+    createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, 
null, null);
+
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
filesetName);
+
+      Path testDir = new Path(schemaPath + "/" + filesetName);
+      FileSystem fs = testDir.getFileSystem(new Configuration());
+      fs.mkdirs(testDir);
+      fs.create(new Path(testDir, "test_file1.txt")).close();
+      fs.create(new Path(testDir, "test_file2.txt")).close();
+      fs.mkdirs(new Path(testDir, "test_subdir"));
+
+      FileInfo[] files = ops.listFiles(filesetIdent, null, "/");
+
+      Assertions.assertNotNull(files);
+      Assertions.assertTrue(files.length >= 3);
+
+      Set<String> fileNames = 
Arrays.stream(files).map(FileInfo::name).collect(Collectors.toSet());
+
+      Assertions.assertTrue(fileNames.contains("test_file1.txt"));
+      Assertions.assertTrue(fileNames.contains("test_file2.txt"));
+      Assertions.assertTrue(fileNames.contains("test_subdir"));
+
+      for (FileInfo file : files) {
+        // verify file type related properties
+        if (file.name().equals("test_file1.txt") || 
file.name().equals("test_file2.txt")) {
+          Assertions.assertFalse(file.isDir(), "File should not be directory: 
" + file.name());
+          Assertions.assertTrue(file.size() >= 0, "File size should be 
non-negative");
+        } else if (file.name().equals("test_subdir")) {
+          Assertions.assertTrue(file.isDir(), "Directory should be marked as 
directory");
+          Assertions.assertEquals(0, file.size(), "Directory size should be 
0");
+        }
+        // verify other properties
+        Assertions.assertNotNull(file.name(), "File name should not be null");
+        Assertions.assertNotNull(file.path(), "File path should not be null");
+        Assertions.assertTrue(file.lastModified() > 0, "Last modified time 
should be positive");
+      }
+    }
+  }
+
+  @Test
+  public void testListFilesetFilesWithFSOpsDisabled() throws Exception {
+    String schemaName = "schema31";
+    String comment = "comment31";
+    String filesetName = "fileset31";
+    String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
+
+    createSchema(schemaName, comment, null, schemaPath);
+    createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, 
null, null);
+
+    Map<String, String> catalogProps = Maps.newHashMap();
+    catalogProps.put(DISABLE_FILESYSTEM_OPS, "true");
+
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(catalogProps, randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
filesetName);
+
+      UnsupportedOperationException ex =
+          Assertions.assertThrows(
+              UnsupportedOperationException.class,
+              () -> ops.listFiles(filesetIdent, null, "/"),
+              "Expected listFiles to throw UnsupportedOperationException when 
disableFSOps is true");
+      Assertions.assertTrue(
+          ex.getMessage().contains("Filesystem operations are disabled on this 
server"),
+          "Exception message should mention 'Filesystem operations are 
disabled on this server'");
+    }
+  }
+
+  @Test
+  public void testListFilesetFilesWithNonExistentPath() throws IOException {
+    String schemaName = "schema32";
+    String comment = "comment32";
+    String filesetName = "fileset32";
+    String schemaPath = TEST_ROOT_PATH + "/" + schemaName;
+
+    createSchema(schemaName, comment, null, schemaPath);
+    createFileset(filesetName, schemaName, comment, Fileset.Type.MANAGED, 
null, null);
+
+    try (SecureHadoopCatalogOperations ops = new 
SecureHadoopCatalogOperations(store)) {
+      ops.initialize(Maps.newHashMap(), randomCatalogInfo(), 
HADOOP_PROPERTIES_METADATA);
+      NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, 
filesetName);
+
+      String nonExistentSubPath = "/non_existent_file.txt";
+      IllegalArgumentException ex =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () -> ops.listFiles(filesetIdent, null, nonExistentSubPath),
+              "Listing a non-existent fileset directory should throw 
IllegalArgumentException");
+
+      Assertions.assertTrue(
+          ex.getMessage().contains("does not exist"),
+          "Exception message should mention that the path does not exist");
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("testRenameArguments")
   public void testRenameFileset(
diff --git 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
index 5dbba66593..0f7c8889ab 100644
--- 
a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
+++ 
b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java
@@ -26,11 +26,15 @@ import static 
org.apache.gravitino.file.Fileset.Type.MANAGED;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogChange;
 import org.apache.gravitino.NameIdentifier;
@@ -52,8 +56,15 @@ import 
org.apache.gravitino.integration.test.container.HiveContainer;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Assumptions;
@@ -78,6 +89,8 @@ public class HadoopCatalogIT extends BaseIT {
   protected FileSystem fileSystem;
   protected String defaultBaseLocation;
 
+  protected CloseableHttpClient httpClient;
+
   protected void startNecessaryContainer() {
     containerSuite.startHiveContainer();
   }
@@ -95,6 +108,11 @@ public class HadoopCatalogIT extends BaseIT {
     createSchema();
   }
 
+  @BeforeAll
+  protected void setupHttpClient() {
+    httpClient = HttpClients.createDefault();
+  }
+
   @AfterAll
   public void stop() throws IOException {
     Catalog catalog = metalake.loadCatalog(catalogName);
@@ -104,6 +122,12 @@ public class HadoopCatalogIT extends BaseIT {
     if (fileSystem != null) {
       fileSystem.close();
     }
+    if (httpClient != null) {
+      try {
+        httpClient.close();
+      } catch (Exception ignored) {
+      }
+    }
 
     try {
       closer.close();
@@ -151,6 +175,44 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertFalse(catalog.asSchemas().schemaExists(schemaName));
   }
 
+  private String getFileInfos(NameIdentifier filesetIdent, String subPath, 
String locationName)
+      throws IOException {
+    String targetPath =
+        "/api/metalakes/"
+            + metalakeName
+            + "/catalogs/"
+            + catalogName
+            + "/schemas/"
+            + schemaName
+            + "/filesets/"
+            + filesetIdent.name()
+            + "/files";
+
+    URIBuilder uriBuilder;
+    try {
+      uriBuilder = new URIBuilder(serverUri + targetPath);
+    } catch (URISyntaxException e) {
+      throw new IOException("Error constructing URI: " + serverUri + 
targetPath, e);
+    }
+
+    if (!StringUtils.isBlank(subPath)) {
+      uriBuilder.addParameter("subPath", subPath);
+    }
+    if (!StringUtils.isBlank(locationName)) {
+      uriBuilder.addParameter("locationName", locationName);
+    }
+
+    HttpGet httpGet;
+    try {
+      httpGet = new HttpGet(uriBuilder.build());
+    } catch (URISyntaxException e) {
+      throw new IOException("Failed to build URI with query parameters: " + 
uriBuilder, e);
+    }
+    try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) {
+      return EntityUtils.toString(httpResponse.getEntity(), 
StandardCharsets.UTF_8);
+    }
+  }
+
   @Test
   public void testFilesetCache() {
     Assumptions.assumeTrue(getClass() == HadoopCatalogIT.class);
@@ -656,6 +718,177 @@ public class HadoopCatalogIT extends BaseIT {
     Assertions.assertEquals(fileset2.name(), nameIdentifiers1[1].name());
   }
 
+  @Test
+  public void testListFilesetFiles() throws IOException {
+    dropSchema();
+    createSchema();
+
+    String filesetName = "test_list_files";
+    String storageLocation = storageLocation(filesetName);
+    createFileset(filesetName, "comment", MANAGED, storageLocation, 
ImmutableMap.of("k1", "v1"));
+    assertFilesetExists(filesetName);
+
+    Path basePath = new Path(storageLocation);
+    fileSystem.mkdirs(basePath);
+
+    String fileName = "test.txt";
+    Path filePath = new Path(basePath, fileName);
+    String content = "hello";
+    try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+      out.write(content.getBytes(StandardCharsets.UTF_8));
+    }
+
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    String actualJson = getFileInfos(filesetIdent, null, null);
+
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"name\":\"%s\"", fileName)),
+        String.format("Response JSON should contain \"name\":\"%s\"", 
fileName));
+    Assertions.assertTrue(
+        actualJson.contains("\"isDir\":false"), "Response JSON should contain 
\"isDir\":false");
+    long actualSize = content.getBytes(StandardCharsets.UTF_8).length;
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"size\":%d", actualSize)),
+        String.format("Response JSON should contain \"size\":%d", actualSize));
+    long lastMod = fileSystem.getFileStatus(filePath).getModificationTime();
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"lastModified\":%d", lastMod)),
+        String.format("Response JSON should contain \"lastModified\":%d", 
lastMod));
+    String suffix = "/" + filesetName + "/";
+    String regex = "\"path\"\\s*:\\s*\"[^\"]*" + Pattern.quote(suffix) + "\"";
+    Pattern suffixChecker = Pattern.compile(regex);
+    Assertions.assertTrue(
+        suffixChecker.matcher(actualJson).find(),
+        () ->
+            String.format("Response JSON should contain a path value ending 
with \"%s\"", suffix));
+  }
+
+  @Test
+  public void testListFilesetFilesWithSubPath() throws IOException {
+    dropSchema();
+    createSchema();
+
+    String filesetName = "test_list_files_with_subpath";
+    String storageLocation = storageLocation(filesetName);
+    createFileset(filesetName, "comment", MANAGED, storageLocation, 
ImmutableMap.of("k1", "v1"));
+    assertFilesetExists(filesetName);
+
+    Path basePath = new Path(storageLocation);
+    String subDirName = "subdir";
+    Path subDir = new Path(basePath, subDirName);
+    fileSystem.mkdirs(subDir);
+
+    String fileName = "test.txt";
+    Path filePath = new Path(subDir, fileName);
+    String content = "hello";
+    try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+      out.write(content.getBytes(StandardCharsets.UTF_8));
+    }
+
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    String actualJson = getFileInfos(filesetIdent, "subdir", null);
+
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"name\":\"%s\"", fileName)),
+        String.format("Response JSON should contain \"name\":\"%s\"", 
fileName));
+    Assertions.assertTrue(
+        actualJson.contains("\"isDir\":false"), "Response JSON should contain 
\"isDir\":false");
+    long actualSize = content.getBytes(StandardCharsets.UTF_8).length;
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"size\":%d", actualSize)),
+        String.format("Response JSON should contain \"size\":%d", actualSize));
+    long lastMod = fileSystem.getFileStatus(filePath).getModificationTime();
+    Assertions.assertTrue(
+        actualJson.contains(String.format("\"lastModified\":%d", lastMod)),
+        String.format("Response JSON should contain \"lastModified\":%d", 
lastMod));
+    String suffix = "/" + filesetName + "/" + subDirName;
+    String regex = "\"path\"\\s*:\\s*\"[^\"]*" + Pattern.quote(suffix) + "\"";
+    Pattern suffixChecker = Pattern.compile(regex);
+    Assertions.assertTrue(
+        suffixChecker.matcher(actualJson).find(),
+        () ->
+            String.format("Response JSON should contain a path value ending 
with \"%s\"", suffix));
+  }
+
+  @Test
+  public void testListFilesetFilesWithLocationName() throws IOException {
+    dropSchema();
+    createSchema();
+
+    String filesetName = "test_list_files_with_location_name";
+    String locA = storageLocation(filesetName + "_locA");
+    String locB = storageLocation(filesetName + "_locB");
+    Map<String, String> storageLocations =
+        ImmutableMap.of(
+            "locA", locA,
+            "locB", locB);
+    Map<String, String> props = 
ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, "locA");
+
+    createMultipleLocationsFileset(filesetName, "comment", MANAGED, 
storageLocations, props);
+    assertFilesetExists(filesetName);
+
+    Path basePathA = new Path(locA);
+    fileSystem.mkdirs(basePathA);
+    String fileNameA = "testA.txt";
+    Path filePathA = new Path(basePathA, fileNameA);
+    String contentA = "hello";
+    try (FSDataOutputStream out = fileSystem.create(filePathA, true)) {
+      out.write(contentA.getBytes(StandardCharsets.UTF_8));
+    }
+
+    Path basePathB = new Path(locB);
+    fileSystem.mkdirs(basePathB);
+    String fileNameB = "testB.txt";
+    Path filePathB = new Path(basePathB, fileNameB);
+    String contentB = "hello hello";
+    try (FSDataOutputStream out = fileSystem.create(filePathB, true)) {
+      out.write(contentB.getBytes(StandardCharsets.UTF_8));
+    }
+
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    // get file info from locA without locationName, and from locB with 
locationName
+    String actualJsonA = getFileInfos(filesetIdent, null, null);
+    String actualJsonB = getFileInfos(filesetIdent, null, "locB");
+
+    // verify locA files
+    Assertions.assertTrue(
+        actualJsonA.contains(String.format("\"name\":\"%s\"", fileNameA)),
+        String.format("Response JSON should contain \"name\":\"%s\"", 
fileNameA));
+    long actualSizeA = contentA.getBytes(StandardCharsets.UTF_8).length;
+    Assertions.assertTrue(
+        actualJsonA.contains(String.format("\"size\":%d", actualSizeA)),
+        String.format("Response JSON should contain \"size\":%d", 
actualSizeA));
+
+    // verify locB files
+    Assertions.assertTrue(
+        actualJsonB.contains(String.format("\"name\":\"%s\"", fileNameB)),
+        String.format("Response JSON should contain \"name\":\"%s\"", 
fileNameB));
+    long actualSizeB = contentA.getBytes(StandardCharsets.UTF_8).length;
+    Assertions.assertTrue(
+        actualJsonA.contains(String.format("\"size\":%d", actualSizeB)),
+        String.format("Response JSON should contain \"size\":%d", 
actualSizeB));
+  }
+
+  @Test
+  public void testListFilesetFilesWithNonExistentPath() throws IOException {
+    dropSchema();
+    createSchema();
+
+    String filesetName = "test_list_files_with_non_existent_path";
+    String storageLocation = storageLocation(filesetName);
+    createFileset(filesetName, "comment", MANAGED, storageLocation, 
ImmutableMap.of("k1", "v1"));
+    assertFilesetExists(filesetName);
+
+    NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
+    String invalidPath = "nonexistent";
+
+    String actualJson = getFileInfos(filesetIdent, invalidPath, null);
+    Assertions.assertTrue(
+        actualJson.contains("does not exist in fileset"),
+        String.format(
+            "Response JSON should contain \"does not exist in fileset\", but 
was: %s", actualJson));
+  }
+
   @Test
   public void testRenameFileset() throws IOException {
     // create fileset
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
new file mode 100644
index 0000000000..c826fa14f4
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.file;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.file.FileInfo;
+
+/** Represents a FileInfo DTO (Data Transfer Object). */
+@EqualsAndHashCode
+@JsonIgnoreProperties({"dir"})
+public class FileInfoDTO implements FileInfo {
+
+  @JsonProperty("name")
+  private String name;
+
+  @JsonProperty("isDir")
+  private boolean isDir;
+
+  @JsonProperty("size")
+  private long size;
+
+  @JsonProperty("lastModified")
+  private long lastModified;
+
+  @JsonProperty("path")
+  private String path;
+
+  private FileInfoDTO() {}
+
+  private FileInfoDTO(String name, boolean isDir, long size, long 
lastModified, String path) {
+    this.name = name;
+    this.isDir = isDir;
+    this.size = size;
+    this.lastModified = lastModified;
+    this.path = path;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public boolean isDir() {
+    return isDir;
+  }
+
+  @Override
+  public long size() {
+    return size;
+  }
+
+  @Override
+  public long lastModified() {
+    return lastModified;
+  }
+
+  @Override
+  public String path() {
+    return path;
+  }
+
+  /**
+   * Create a new FileInfoDTO builder.
+   *
+   * @return A new FileInfoDTO builder.
+   */
+  public static FileInfoDTO.FileInfoDTOBuilder builder() {
+    return new FileInfoDTO.FileInfoDTOBuilder();
+  }
+
+  /** Builder for FileInfoDTO. */
+  public static class FileInfoDTOBuilder {
+    private String name;
+    private boolean isDir;
+    private long size;
+    private long lastModified;
+    private String path;
+
+    private FileInfoDTOBuilder() {}
+
+    /**
+     * Set the name of the FileInfo.
+     *
+     * @param name The name of the file.
+     * @return The builder instance.
+     */
+    public FileInfoDTO.FileInfoDTOBuilder name(String name) {
+      this.name = name;
+      return this;
+    }
+
+    /**
+     * Set the isDir of the FileInfo.
+     *
+     * @param isDir The isDir of the file.
+     * @return The builder instance.
+     */
+    public FileInfoDTO.FileInfoDTOBuilder isDir(boolean isDir) {
+      this.isDir = isDir;
+      return this;
+    }
+
+    /**
+     * Set the size of the FileInfo.
+     *
+     * @param size The size of the file.
+     * @return The builder instance.
+     */
+    public FileInfoDTO.FileInfoDTOBuilder size(long size) {
+      this.size = size;
+      return this;
+    }
+
+    /**
+     * Set the lastModified of the FileInfo.
+     *
+     * @param lastModified The lastModified of the file.
+     * @return The builder instance.
+     */
+    public FileInfoDTO.FileInfoDTOBuilder lastModified(long lastModified) {
+      this.lastModified = lastModified;
+      return this;
+    }
+
+    /**
+     * Set the path of the FileInfo.
+     *
+     * @param path The path of the file.
+     * @return The builder instance.
+     */
+    public FileInfoDTO.FileInfoDTOBuilder path(String path) {
+      this.path = path;
+      return this;
+    }
+
+    /**
+     * Build the FileInfoDTO.
+     *
+     * @return The built FileInfoDTO.
+     */
+    public FileInfoDTO build() {
+      Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot 
be null or empty");
+      Preconditions.checkArgument(size >= 0, "size cannot be negative");
+      Preconditions.checkArgument(lastModified > 0, "lastModified must be a 
valid timestamp");
+      Preconditions.checkArgument(StringUtils.isNotBlank(path), "path cannot 
be null or empty");
+
+      return new FileInfoDTO(name, isDir, size, lastModified, path);
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/FileInfoListResponse.java
 
b/common/src/main/java/org/apache/gravitino/dto/responses/FileInfoListResponse.java
new file mode 100644
index 0000000000..5e305a1f0c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/FileInfoListResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.gravitino.dto.file.FileInfoDTO;
+
+/** Response for listing files info in a fileset. */
+@Getter
+@ToString
+@EqualsAndHashCode(callSuper = true)
+public class FileInfoListResponse extends BaseResponse {
+
+  @JsonProperty("files")
+  private final FileInfoDTO[] files;
+
+  /** Constructor for FileInfoListResponse. */
+  public FileInfoListResponse() {
+    super(0);
+    this.files = null;
+  }
+
+  /**
+   * Constructor for FileInfoListResponse.
+   *
+   * @param files Array of FileInfo objects to be returned.
+   */
+  public FileInfoListResponse(FileInfoDTO[] files) {
+    super(0);
+    this.files = files;
+  }
+
+  /**
+   * Validates the response.
+   *
+   * @throws IllegalArgumentException if the response is invalid.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+    Preconditions.checkArgument(files != null, "files must not be null");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java 
b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
index 5dd503ce84..1f7c8f5f5a 100644
--- a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
+++ b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
@@ -49,6 +49,7 @@ import org.apache.gravitino.dto.authorization.RoleDTO;
 import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
 import org.apache.gravitino.dto.authorization.UserDTO;
 import org.apache.gravitino.dto.credential.CredentialDTO;
+import org.apache.gravitino.dto.file.FileInfoDTO;
 import org.apache.gravitino.dto.file.FilesetDTO;
 import org.apache.gravitino.dto.messaging.TopicDTO;
 import org.apache.gravitino.dto.model.ModelDTO;
@@ -80,6 +81,7 @@ import org.apache.gravitino.dto.rel.partitions.PartitionDTO;
 import org.apache.gravitino.dto.rel.partitions.RangePartitionDTO;
 import org.apache.gravitino.dto.tag.MetadataObjectDTO;
 import org.apache.gravitino.dto.tag.TagDTO;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.model.Model;
@@ -618,6 +620,26 @@ public class DTOConverters {
         .build();
   }
 
+  /**
+   * Converts array of FileInfo to array of FileInfoDTO.
+   *
+   * @param files The FileInfo array to convert.
+   * @return The converted FileInfoDTO array.
+   */
+  public static FileInfoDTO[] toDTO(FileInfo[] files) {
+    return Arrays.stream(files)
+        .map(
+            file ->
+                FileInfoDTO.builder()
+                    .name(file.name())
+                    .isDir(file.isDir())
+                    .size(file.size())
+                    .lastModified(file.lastModified())
+                    .path(file.path())
+                    .build())
+        .toArray(FileInfoDTO[]::new);
+  }
+
   /**
    * Converts a Topic to a TopicDTO.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java 
b/common/src/main/java/org/apache/gravitino/file/FileInfo.java
similarity index 51%
copy from core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java
copy to common/src/main/java/org/apache/gravitino/file/FileInfo.java
index 2c6c6a47f0..9e15b2cad7 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java
+++ b/common/src/main/java/org/apache/gravitino/file/FileInfo.java
@@ -16,15 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.gravitino.catalog;
-
-import org.apache.gravitino.file.FilesetCatalog;
+package org.apache.gravitino.file;
 
 /**
- * {@code FilesetDispatcher} interface acts as a specialization of the {@link 
FilesetCatalog}
- * interface. This interface is designed to potentially add custom behaviors 
or operations related
- * to dispatching or handling fileset-related events or actions that are not 
covered by the standard
- * {@code FilesetCatalog} operations.
+ * Represents metadata about a single file or directory within a fileset.
+ *
+ * <p>Implementations of this interface provide access to the basic attributes 
of a fileset entry,
+ * including its name, type (file vs. directory), size, last-modified 
timestamp, and its logical
+ * path within the enclosing fileset.
  */
-public interface FilesetDispatcher extends FilesetCatalog {}
+public interface FileInfo {
+
+  /** @return The filename or directory name of file object. */
+  String name();
+
+  /** @return Whether this is a directory (true). */
+  boolean isDir();
+
+  /** @return The file size in bytes (0 if directory). */
+  long size();
+
+  /** @return The last modification time as an Instant. */
+  long lastModified();
+
+  /** @return The full path of the file or directory within the fileset. */
+  String path();
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 00397ad254..ba6a4d962b 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -172,6 +172,17 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
           });
     }
 
+    public <R> R doWithFilesetFileOps(ThrowableFunction<FilesetFileOps, R> fn) 
throws Exception {
+      return classLoader.withClassLoader(
+          cl -> {
+            if (asFilesetFileOps() == null) {
+              throw new UnsupportedOperationException(
+                  "Catalog does not support fileset file operations");
+            }
+            return fn.apply(asFilesetFileOps());
+          });
+    }
+
     public <R> R doWithCredentialOps(ThrowableFunction<BaseCatalog, R> fn) 
throws Exception {
       return classLoader.withClassLoader(cl -> fn.apply(catalog));
     }
@@ -251,6 +262,10 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
       return catalog.ops() instanceof FilesetCatalog ? (FilesetCatalog) 
catalog.ops() : null;
     }
 
+    private FilesetFileOps asFilesetFileOps() {
+      return catalog.ops() instanceof FilesetFileOps ? (FilesetFileOps) 
catalog.ops() : null;
+    }
+
     private TopicCatalog asTopics() {
       return catalog.ops() instanceof TopicCatalog ? (TopicCatalog) 
catalog.ops() : null;
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java
index 2c6c6a47f0..8c4f4d747e 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/FilesetDispatcher.java
@@ -23,8 +23,8 @@ import org.apache.gravitino.file.FilesetCatalog;
 
 /**
  * {@code FilesetDispatcher} interface acts as a specialization of the {@link 
FilesetCatalog}
- * interface. This interface is designed to potentially add custom behaviors 
or operations related
- * to dispatching or handling fileset-related events or actions that are not 
covered by the standard
- * {@code FilesetCatalog} operations.
+ * interface, and extends {@link FilesetFileOps} for file operations.This 
interface is designed to
+ * potentially add custom behaviors or operations related to dispatching or 
handling fileset-related
+ * events or actions that are not covered by the standard {@code 
FilesetCatalog} operations.
  */
-public interface FilesetDispatcher extends FilesetCatalog {}
+public interface FilesetDispatcher extends FilesetCatalog, FilesetFileOps {}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetFileOps.java 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetFileOps.java
new file mode 100644
index 0000000000..c06db4eb42
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/FilesetFileOps.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog;
+
+import java.io.IOException;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.file.FileInfo;
+
+/**
+ * The {@code FilesetFileOps} interface defines operations for managing files 
within filesets. This
+ * interface is designed to be used internally by the server and not exposed 
to public client APIs
+ * to avoid confusion.
+ */
+public interface FilesetFileOps {
+
+  /**
+   * List the files in a fileset with a specific location name and sub path.
+   *
+   * @param ident A fileset identifier.
+   * @param locationName The location name. If null, the default location will 
be used.
+   * @param subPath The sub path under the fileset.
+   * @return An array of file information objects.
+   */
+  default FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException, IOException {
+    throw new UnsupportedOperationException(
+        "listFiles not supported by " + getClass().getSimpleName());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index 9a14004050..44a0c1c729 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -22,6 +22,7 @@ import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
 import static 
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
 import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
 
+import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.NameIdentifier;
@@ -31,6 +32,7 @@ import 
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 
@@ -52,6 +54,12 @@ public class FilesetNormalizeDispatcher implements 
FilesetDispatcher {
     return normalizeCaseSensitive(identifiers);
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException, IOException {
+    return dispatcher.listFiles(normalizeCaseSensitive(ident), locationName, 
subPath);
+  }
+
   @Override
   public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
     // The constraints of the name spec may be more strict than underlying 
catalog,
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 5a2eb6a997..4f9de3dcbc 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptyEntityException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.lock.LockType;
@@ -99,6 +100,20 @@ public class FilesetOperationDispatcher extends 
OperationDispatcher implements F
                 fileset.properties()));
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    return TreeLockUtils.doWithTreeLock(
+        ident,
+        LockType.READ,
+        () ->
+            doWithCatalog(
+                catalogIdent,
+                c -> c.doWithFilesetFileOps(f -> f.listFiles(ident, 
locationName, subPath)),
+                NoSuchFilesetException.class));
+  }
+
   /**
    * Create a fileset metadata in the catalog.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index a6acbff441..d1653f2772 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.hook;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.Entity;
@@ -32,6 +33,7 @@ import 
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.utils.NameIdentifierUtil;
@@ -54,6 +56,12 @@ public class FilesetHookDispatcher implements 
FilesetDispatcher {
     return dispatcher.listFilesets(namespace);
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException, IOException {
+    return dispatcher.listFiles(ident, locationName, subPath);
+  }
+
   @Override
   public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
     return dispatcher.loadFileset(ident);
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java 
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
index cb1c2ad1d2..f332f84c10 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.listener;
 
 import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -29,6 +30,7 @@ import 
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
 import org.apache.gravitino.exceptions.NoSuchLocationNameException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.listener.api.event.AlterFilesetEvent;
@@ -81,6 +83,13 @@ public class FilesetEventDispatcher implements 
FilesetDispatcher {
     }
   }
 
+  @Override
+  public FileInfo[] listFiles(NameIdentifier ident, String locationName, 
String subPath)
+      throws NoSuchFilesetException, IOException {
+    // TODO: implement the ListFilesEvent
+    return dispatcher.listFiles(ident, locationName, subPath);
+  }
+
   @Override
   public Fileset loadFileset(NameIdentifier ident) throws 
NoSuchFilesetException {
     eventBus.dispatchEvent(new 
LoadFilesetPreEvent(PrincipalUtils.getCurrentUserName(), ident));
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
index 06342d4131..51baa3c655 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java
@@ -23,6 +23,9 @@ import static 
org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import com.google.common.collect.ImmutableMap;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,6 +34,7 @@ import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.NotNull;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -40,6 +44,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.audit.CallerContext;
@@ -49,9 +54,11 @@ import 
org.apache.gravitino.dto.requests.FilesetUpdateRequest;
 import org.apache.gravitino.dto.requests.FilesetUpdatesRequest;
 import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.EntityListResponse;
+import org.apache.gravitino.dto.responses.FileInfoListResponse;
 import org.apache.gravitino.dto.responses.FileLocationResponse;
 import org.apache.gravitino.dto.responses.FilesetResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.metrics.MetricNames;
@@ -182,6 +189,56 @@ public class FilesetOperations {
     }
   }
 
+  @GET
+  @Path("{fileset}/files")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "list-fileset-files." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "list-fileset-files", absolute = true)
+  public Response listFiles(
+      @PathParam("metalake") String metalake,
+      @PathParam("catalog") String catalog,
+      @PathParam("schema") String schema,
+      @PathParam("fileset") String fileset,
+      @QueryParam("subPath") @DefaultValue("/") String subPath,
+      @QueryParam("locationName") String locationName)
+      throws UnsupportedEncodingException {
+    LOG.info(
+        "Received list files request: {}.{}.{}.{}, subPath: {}, 
locationName:{}",
+        metalake,
+        catalog,
+        schema,
+        fileset,
+        subPath,
+        locationName);
+
+    final String decodedSubPath =
+        StringUtils.isNotBlank(subPath)
+            ? URLDecoder.decode(subPath, StandardCharsets.UTF_8.name())
+            : subPath;
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            NameIdentifier filesetIdent =
+                NameIdentifierUtil.ofFileset(metalake, catalog, schema, 
fileset);
+            FileInfo[] files = dispatcher.listFiles(filesetIdent, 
locationName, decodedSubPath);
+            Response response = Utils.ok(new 
FileInfoListResponse(DTOConverters.toDTO(files)));
+            LOG.info(
+                "Files listed for fileset: {}.{}.{}.{}, subPath: {}, 
locationName:{}",
+                metalake,
+                catalog,
+                schema,
+                fileset,
+                subPath,
+                locationName);
+            return response;
+          });
+    } catch (Exception e) {
+      return ExceptionHandlers.handleFilesetException(OperationType.LOAD, 
fileset, schema, e);
+    }
+  }
+
   @PUT
   @Path("{fileset}")
   @Produces("application/vnd.gravitino.v1+json")

Reply via email to