This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 499423c85cd [HUDI-7041] Optimize the memory usage of timeline server 
for table service (#10002)
499423c85cd is described below

commit 499423c85cda6e9ccf5cec405732517b0ea39355
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Sun Nov 26 10:13:46 2023 +0800

    [HUDI-7041] Optimize the memory usage of timeline server for table service 
(#10002)
---
 .../action/clean/CleanPlanActionExecutor.java      |  30 +++--
 .../hudi/table/action/clean/CleanPlanner.java      |   4 +-
 .../cluster/strategy/ClusteringPlanStrategy.java   |   2 +-
 .../BaseHoodieCompactionPlanGenerator.java         |   2 +-
 .../table/view/AbstractTableFileSystemView.java    | 133 ++++++++++++++++++---
 .../table/view/HoodieTableFileSystemView.java      |   5 +
 .../table/view/PriorityBasedFileSystemView.java    |  10 ++
 .../view/RemoteHoodieTableFileSystemView.java      |  28 +++++
 .../table/view/RocksDbBasedFileSystemView.java     |   6 +
 .../common/table/view/TableFileSystemView.java     |  25 ++++
 .../hudi/common/util/RocksDBSchemaHelper.java      |   4 +
 .../table/view/TestHoodieTableFileSystemView.java  | 106 +++++++++++++++-
 .../hudi/timeline/service/RequestHandler.java      |  16 +++
 .../service/handlers/FileSliceHandler.java         |  17 ++-
 14 files changed, 351 insertions(+), 37 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 3b5d1233214..a70bfd256c0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -41,7 +41,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -118,17 +120,23 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
 
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list 
of file slices to be cleaned: " + config.getTableName());
 
-      Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, 
earliestInstant)), cleanerParallelism)
-          .stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      Map<String, List<HoodieCleanFileInfo>> cleanOps = 
cleanOpsWithPartitionMeta.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey,
-              e -> 
CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));
-
-      List<String> partitionsToDelete = 
cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> 
entry.getValue().getKey()).map(Map.Entry::getKey)
-          .collect(Collectors.toList());
+      Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
+      List<String> partitionsToDelete = new ArrayList<>();
+      for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
+        // Handles at most 'cleanerParallelism' number of partitions once at a 
time to avoid overlarge memory pressure to the timeline server
+        // (remote or local embedded), thus to reduce the risk of an OOM 
exception.
+        List<String> subPartitionsToClean = partitionsToClean.subList(i, 
Math.min(i + cleanerParallelism, partitionsToClean.size()));
+        Map<String, Pair<Boolean, List<CleanFileInfo>>> 
cleanOpsWithPartitionMeta = context
+            .map(subPartitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, 
earliestInstant)), cleanerParallelism)
+            .stream()
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+        cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))));
+
+        
partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry
 -> entry.getValue().getKey()).map(Map.Entry::getKey)
+            .collect(Collectors.toList()));
+      }
 
       return new HoodieCleanerPlan(earliestInstant
           .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), 
x.getState().name())).orElse(null),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 112034fd877..efbca863e50 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -254,7 +254,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     // In other words, the file versions only apply to the active file groups.
     deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, Option.empty()));
     boolean toDeletePartition = false;
-    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
     for (HoodieFileGroup fileGroup : fileGroups) {
       int keepVersions = config.getCleanerFileVersionsRetained();
       // do not cleanup slice required for pending compaction
@@ -329,7 +329,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
       // all replaced file groups before earliestCommitToRetain are eligible 
to clean
       deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, earliestCommitToRetain));
       // add active files
-      List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+      List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
       for (HoodieFileGroup fileGroup : fileGroups) {
         List<FileSlice> fileSliceList = 
fileGroup.getAllFileSlices().collect(Collectors.toList());
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 2d2c2a36643..0d07bed531a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -121,7 +121,7 @@ public abstract class ClusteringPlanStrategy<T,I,K,O> 
implements Serializable {
             .collect(Collectors.toSet());
     
fgIdsInPendingCompactionLogCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));
 
-    return hoodieTable.getSliceView().getLatestFileSlices(partition)
+    return hoodieTable.getSliceView().getLatestFileSlicesStateless(partition)
         // file ids already in clustering are not eligible
         .filter(slice -> 
!fgIdsInPendingCompactionLogCompactionAndClustering.contains(slice.getFileGroupId()));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 0f3beb136b2..2626bc59918 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -117,7 +117,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
 
     List<HoodieCompactionOperation> operations = 
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
-        .getLatestFileSlices(partitionPath)
+        .getLatestFileSlicesStateless(partitionPath)
         .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering, instantRange))
         .map(s -> {
           List<HoodieLogFile> logFiles = s.getLogFiles()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index b3dc0fbce0a..14f55b76889 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -433,6 +433,19 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     return fileStatusMap;
   }
 
+  /**
+   * Returns all files situated at the given partition.
+   */
+  private FileStatus[] getAllFilesInPartition(String relativePartitionPath) 
throws IOException {
+    Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), 
relativePartitionPath);
+    long beginLsTs = System.currentTimeMillis();
+    FileStatus[] statuses = listPartition(partitionPath);
+    long endLsTs = System.currentTimeMillis();
+    LOG.debug("#files found in partition (" + relativePartitionPath + ") =" + 
statuses.length + ", Time taken ="
+            + (endLsTs - beginLsTs));
+    return statuses;
+  }
+
   /**
    * Allows lazily loading the partitions if needed.
    *
@@ -449,15 +462,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
         // Not loaded yet
         try {
           LOG.info("Building file system view for partition (" + 
partitionPathStr + ")");
-
-          Path partitionPath = 
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
-          long beginLsTs = System.currentTimeMillis();
-          FileStatus[] statuses = listPartition(partitionPath);
-          long endLsTs = System.currentTimeMillis();
-          LOG.debug("#files found in partition (" + partitionPathStr + ") =" + 
statuses.length + ", Time taken ="
-              + (endLsTs - beginLsTs));
-          List<HoodieFileGroup> groups = addFilesToView(statuses);
-
+          List<HoodieFileGroup> groups = 
addFilesToView(getAllFilesInPartition(partitionPathStr));
           if (groups.isEmpty()) {
             storePartitionView(partitionPathStr, new ArrayList<>());
           }
@@ -598,24 +603,32 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup 
fileGroup) {
+    return addBootstrapBaseFileIfPresent(fileGroup, 
this::getBootstrapBaseFile);
+  }
+
+  protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup 
fileGroup, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> 
bootstrapBaseFileMappingFunc) {
     boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
         .anyMatch(fs -> 
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
     if (hasBootstrapBaseFile) {
       HoodieFileGroup newFileGroup = new HoodieFileGroup(fileGroup);
       newFileGroup.getAllFileSlices().filter(fs -> 
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS))
           .forEach(fs -> fs.setBaseFile(
-              addBootstrapBaseFileIfPresent(fs.getFileGroupId(), 
fs.getBaseFile().get())));
+              addBootstrapBaseFileIfPresent(fs.getFileGroupId(), 
fs.getBaseFile().get(), bootstrapBaseFileMappingFunc)));
       return newFileGroup;
     }
     return fileGroup;
   }
 
   protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) {
+    return addBootstrapBaseFileIfPresent(fileSlice, 
this::getBootstrapBaseFile);
+  }
+
+  protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice, 
Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> 
bootstrapBaseFileMappingFunc) {
     if (fileSlice.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
       FileSlice copy = new FileSlice(fileSlice);
       copy.getBaseFile().ifPresent(dataFile -> {
         Option<BootstrapBaseFileMapping> edf = 
getBootstrapBaseFile(copy.getFileGroupId());
-        edf.ifPresent(e -> 
dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
+        bootstrapBaseFileMappingFunc.apply(copy.getFileGroupId()).ifPresent(e 
-> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
       });
       return copy;
     }
@@ -623,10 +636,16 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId 
fileGroupId, HoodieBaseFile baseFile) {
+    return addBootstrapBaseFileIfPresent(fileGroupId, baseFile, 
this::getBootstrapBaseFile);
+  }
+
+  protected HoodieBaseFile addBootstrapBaseFileIfPresent(
+      HoodieFileGroupId fileGroupId,
+      HoodieBaseFile baseFile,
+      Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>> 
bootstrapBaseFileMappingFunc) {
     if (baseFile.getCommitTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
       HoodieBaseFile copy = new HoodieBaseFile(baseFile);
-      Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(fileGroupId);
-      edf.ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
+      bootstrapBaseFileMappingFunc.apply(fileGroupId).ifPresent(e -> 
copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
       return copy;
     }
     return baseFile;
@@ -706,7 +725,6 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   public final Map<String, Stream<HoodieBaseFile>> 
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
     try {
       readLock.lock();
-
       List<String> formattedPartitionList = 
ensureAllPartitionsLoadedCorrectly();
       return formattedPartitionList.stream().collect(Collectors.toMap(
           Function.identity(),
@@ -824,6 +842,31 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     }
   }
 
+  @Override
+  public final Stream<FileSlice> getLatestFileSlicesStateless(String 
partitionStr) {
+    String partition = formatPartitionKey(partitionStr);
+    if (isPartitionAvailableInStore(partition)) {
+      return getLatestFileSlices(partition);
+    } else {
+      try {
+        Stream<FileSlice> fileSliceStream = 
buildFileGroups(getAllFilesInPartition(partition), 
visibleCommitsAndCompactionTimeline, true).stream()
+            .filter(fg -> !isFileGroupReplaced(fg))
+            .map(HoodieFileGroup::getLatestFileSlice)
+            .filter(Option::isPresent).map(Option::get)
+            .flatMap(slice -> this.filterUncommittedFiles(slice, true));
+        if (bootstrapIndex.useIndex()) {
+          final Map<HoodieFileGroupId, BootstrapBaseFileMapping> 
bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
+          if (!bootstrapBaseFileMappings.isEmpty()) {
+            return fileSliceStream.map(fileSlice -> 
addBootstrapBaseFileIfPresent(fileSlice, fileGroupId -> 
Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
+          }
+        }
+        return fileSliceStream;
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to fetch all files in partition " 
+ partition, e);
+      }
+    }
+  }
+
   /**
    * Get Latest File Slice for a given fileId in a given partition.
    */
@@ -1014,6 +1057,39 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> 
!isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String 
partitionStr) {
+    String partition = formatPartitionKey(partitionStr);
+    if (isPartitionAvailableInStore(partition)) {
+      return getAllFileGroups(partition);
+    } else {
+      try {
+        Stream<HoodieFileGroup> fileGroupStream = 
buildFileGroups(getAllFilesInPartition(partition), 
visibleCommitsAndCompactionTimeline, true).stream()
+            .filter(fg -> !isFileGroupReplaced(fg));
+        if (bootstrapIndex.useIndex()) {
+          final Map<HoodieFileGroupId, BootstrapBaseFileMapping> 
bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
+          if (!bootstrapBaseFileMappings.isEmpty()) {
+            return fileGroupStream.map(fileGroup -> 
addBootstrapBaseFileIfPresent(fileGroup, fileGroupId -> 
Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
+          }
+        }
+        return fileGroupStream;
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to fetch all files in partition " 
+ partition, e);
+      }
+    }
+  }
+
+  private Map<HoodieFileGroupId, BootstrapBaseFileMapping> 
getBootstrapBaseFileMappings(String partition) {
+    try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
+      LOG.info("Bootstrap Index available for partition " + partition);
+      List<BootstrapFileMapping> sourceFileMappings =
+          reader.getSourceFileMappingForPartition(partition);
+      return sourceFileMappings.stream()
+          .map(s -> new BootstrapBaseFileMapping(new 
HoodieFileGroupId(s.getPartitionPath(),
+              s.getFileId()), 
s.getBootstrapFileStatus())).collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId,
 s -> s));
+    }
+  }
+
   private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final 
String partitionStr) {
     try {
       readLock.lock();
@@ -1029,22 +1105,38 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String 
maxCommitTime, String partitionPath) {
-    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
+    String partition = formatPartitionKey(partitionPath);
+    if (hasReplacedFilesInPartition(partition)) {
+      return getAllFileGroupsIncludingReplaced(partition).filter(fg -> 
isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
+    }
+    return Stream.empty();
   }
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String 
maxCommitTime, String partitionPath) {
-    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+    String partition = formatPartitionKey(partitionPath);
+    if (hasReplacedFilesInPartition(partition)) {
+      return getAllFileGroupsIncludingReplaced(partition).filter(fg -> 
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+    }
+    return Stream.empty();
   }
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String 
minCommitTime, String partitionPath) {
-    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
+    String partition = formatPartitionKey(partitionPath);
+    if (hasReplacedFilesInPartition(partition)) {
+      return getAllFileGroupsIncludingReplaced(partition).filter(fg -> 
isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
+    }
+    return Stream.empty();
   }
 
   @Override
   public Stream<HoodieFileGroup> getAllReplacedFileGroups(String 
partitionPath) {
-    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplaced(fg.getFileGroupId()));
+    String partition = formatPartitionKey(partitionPath);
+    if (hasReplacedFilesInPartition(partition)) {
+      return getAllFileGroupsIncludingReplaced(partition).filter(fg -> 
isFileGroupReplaced(fg.getFileGroupId()));
+    }
+    return Stream.empty();
   }
 
   @Override
@@ -1263,6 +1355,11 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    */
   protected abstract void removeReplacedFileIdsAtInstants(Set<String> 
instants);
 
+  /**
+   * Returns whether there are replaced files within the given partition.
+   */
+  protected abstract boolean hasReplacedFilesInPartition(String partitionPath);
+
   /**
    * Track instant time for file groups replaced.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index bb98c97e28d..f1b56ebe519 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -408,6 +408,11 @@ public class HoodieTableFileSystemView extends 
IncrementalTimelineSyncFileSystem
     fgIdToReplaceInstants.entrySet().removeIf(entry -> 
instants.contains(entry.getValue().getTimestamp()));
   }
 
+  @Override
+  protected boolean hasReplacedFilesInPartition(String partitionPath) {
+    return fgIdToReplaceInstants.keySet().stream().anyMatch(fg -> 
fg.getPartitionPath().equals(partitionPath));
+  }
+
   @Override
   protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId 
fileGroupId) {
     return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index e30b9f425d2..56d7c7cc25c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -182,6 +182,11 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getLatestFileSlices, 
secondaryView::getLatestFileSlices);
   }
 
+  @Override
+  public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
+    return execute(partitionPath, preferredView::getLatestFileSlicesStateless, 
secondaryView::getLatestFileSlicesStateless);
+  }
+
   @Override
   public Stream<FileSlice> getLatestUnCompactedFileSlices(String 
partitionPath) {
     return execute(partitionPath, 
preferredView::getLatestUnCompactedFileSlices,
@@ -222,6 +227,11 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getAllFileGroups, 
secondaryView::getAllFileGroups);
   }
 
+  @Override
+  public Stream<HoodieFileGroup> getAllFileGroupsStateless(String 
partitionPath) {
+    return execute(partitionPath, preferredView::getAllFileGroupsStateless, 
secondaryView::getAllFileGroupsStateless);
+  }
+
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String 
maxCommitTime, String partitionPath) {
     return execute(maxCommitTime, partitionPath, 
preferredView::getReplacedFileGroupsBeforeOrOn, 
secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index a6318608bcf..4363a7daf27 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
 
   private static final String BASE_URL = "/v1/hoodie/view";
   public static final String LATEST_PARTITION_SLICES_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
+  public static final String LATEST_PARTITION_SLICES_STATELESS_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
   public static final String LATEST_PARTITION_SLICE_URL = 
String.format("%s/%s", BASE_URL, "slices/file/latest/");
   public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
       String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
@@ -101,6 +102,9 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
       String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
 
+  public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
+      String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
+
   public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
       String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
 
@@ -332,6 +336,18 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
+    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    try {
+      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap,
+          new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
+      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Option<FileSlice> getLatestFileSlice(String partitionPath, String 
fileId) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
@@ -438,6 +454,18 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<HoodieFileGroup> getAllFileGroupsStateless(String 
partitionPath) {
+    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    try {
+      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap,
+              new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String 
maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index 26795656383..b2b05e32481 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -553,6 +553,12 @@ public class RocksDbBasedFileSystemView extends 
IncrementalTimelineSyncFileSyste
     );
   }
 
+  @Override
+  protected boolean hasReplacedFilesInPartition(String partitionPath) {
+    return 
rocksDB.<HoodieInstant>prefixSearch(schemaHelper.getColFamilyForReplacedFileGroups(),
 schemaHelper.getPrefixForReplacedFileGroup(partitionPath))
+        .findAny().isPresent();
+  }
+
   @Override
   protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId 
fileGroupId) {
     String lookupKey = schemaHelper.getKeyForReplacedFileGroup(fileGroupId);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 6fedb8684c9..1bcd1de61bc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -107,6 +107,19 @@ public interface TableFileSystemView {
      */
     Stream<FileSlice> getLatestFileSlices(String partitionPath);
 
+    /**
+     * Stream all the latest file slices in the given partition
+     * without caching the file group mappings.
+     *
+     * <p>This is useful for some table services such as compaction and 
clustering, these services may search around the files to clean
+     * within some ancient data partitions, if there triggers a full table 
service for enormous number of partitions, the cache could
+     * cause a huge memory pressure to the timeline server which induces an 
OOM exception.
+     *
+     * <p>The caching of these file groups does not benefit to writers most 
often because the writers
+     * write to recent data partitions usually.
+     */
+    Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath);
+
     /**
      * Get Latest File Slice for a given fileId in a given partition.
      */
@@ -168,6 +181,18 @@ public interface TableFileSystemView {
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
+  /**
+   * Stream all the file groups for a given partition without caching the file 
group mappings.
+   *
+   * <p>This is useful for some table services such as cleaning, the cleaning 
service may search around the files to clean
+   * within some ancient data partitions, if there triggers a full table 
cleaning for enormous number of partitions, the cache could
+   * cause a huge memory pressure to the timeline server which induces an OOM 
exception.
+   *
+   * <p>The caching of these file groups does not benefit to writers most 
often because the writers
+   * write to recent data partitions usually.
+   */
+  Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath);
+
   /**
    * Return Pending Compaction Operations.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
index 45b2a13eb72..ff924e45013 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
@@ -87,6 +87,10 @@ public class RocksDBSchemaHelper {
     return getPartitionFileIdBasedLookup(fgId);
   }
 
+  public String getPrefixForReplacedFileGroup(String partitionPath) {
+    return String.format("part=%s,id=", partitionPath);
+  }
+
   public String getKeyForFileGroupsInPendingClustering(HoodieFileGroupId fgId) 
{
     return getPartitionFileIdBasedLookup(fgId);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index ab8a1fd3aa2..558dc59f40c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -90,11 +90,12 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -328,6 +329,109 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     assertEquals(2, fsView.getAllFileGroups(partitionPath).count());
   }
 
+  @Test
+  public void testViewForGetAllFileGroupsStateless() throws Exception {
+    String partitionPath1 = "2023/11/22";
+    new File(basePath + "/" + partitionPath1).mkdirs();
+    new File(basePath + "/" + partitionPath1 + "/" + 
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+    String partitionPath2 = "2023/11/23";
+    new File(basePath + "/" + partitionPath2).mkdirs();
+    new File(basePath + "/" + partitionPath2 + "/" + 
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+
+    // create 2 fileId in partition1
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String commitTime1 = "1";
+    String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, 
fileId1);
+    String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, 
fileId2);
+    new File(basePath + "/" + partitionPath1 + "/" + 
fileName1).createNewFile();
+    new File(basePath + "/" + partitionPath1 + "/" + 
fileName2).createNewFile();
+
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant1 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, commitTime1);
+    saveAsComplete(commitTimeline, instant1, Option.empty());
+
+    // create 2 fileId in partition2
+    String fileId3 = UUID.randomUUID().toString();
+    String fileId4 = UUID.randomUUID().toString();
+    String commitTime2 = "2";
+    String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, 
fileId3);
+    String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, 
fileId4);
+    new File(basePath + "/" + partitionPath2 + "/" + 
fileName3).createNewFile();
+    new File(basePath + "/" + partitionPath2 + "/" + 
fileName4).createNewFile();
+
+    HoodieInstant instant2 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, commitTime2);
+    saveAsComplete(commitTimeline, instant2, Option.empty());
+
+    fsView.sync();
+    // invokes the stateless API first then the normal API, assert the result 
equality with different file group objects
+    List<HoodieFileGroup> actual1 = 
fsView.getAllFileGroupsStateless(partitionPath1).collect(Collectors.toList());
+    List<HoodieFileGroup> expected1 = 
fsView.getAllFileGroups(partitionPath1).collect(Collectors.toList());
+    for (int i = 0; i < expected1.size(); i++) {
+      assertThat("The stateless API should return the same result", 
actual1.get(i).toString(), is(expected1.get(i).toString()));
+      assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does 
not cache");
+    }
+
+    List<HoodieFileGroup> expected2 = 
fsView.getAllFileGroupsStateless(partitionPath2).collect(Collectors.toList());
+    List<HoodieFileGroup> actual2 = 
fsView.getAllFileGroups(partitionPath2).collect(Collectors.toList());
+    for (int i = 0; i < expected2.size(); i++) {
+      assertThat("The stateless API should return the same result", 
actual2.get(i).toString(), is(expected2.get(i).toString()));
+      assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does 
not cache");
+    }
+  }
+
+  @Test
+  public void testViewForGetLatestFileSlicesStateless() throws Exception {
+    String partitionPath1 = "2023/11/22";
+    new File(basePath + "/" + partitionPath1).mkdirs();
+    new File(basePath + "/" + partitionPath1 + "/" + 
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+    String partitionPath2 = "2023/11/23";
+    new File(basePath + "/" + partitionPath2).mkdirs();
+    new File(basePath + "/" + partitionPath2 + "/" + 
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+
+    // create 2 fileId in partition1
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String commitTime1 = "1";
+    String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, 
fileId1);
+    String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, 
fileId2);
+    new File(basePath + "/" + partitionPath1 + "/" + 
fileName1).createNewFile();
+    new File(basePath + "/" + partitionPath1 + "/" + 
fileName2).createNewFile();
+
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant1 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, commitTime1);
+    saveAsComplete(commitTimeline, instant1, Option.empty());
+
+    // create 2 fileId in partition2
+    String fileId3 = UUID.randomUUID().toString();
+    String fileId4 = UUID.randomUUID().toString();
+    String commitTime2 = "2";
+    String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, 
fileId3);
+    String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, 
fileId4);
+    new File(basePath + "/" + partitionPath2 + "/" + 
fileName3).createNewFile();
+    new File(basePath + "/" + partitionPath2 + "/" + 
fileName4).createNewFile();
+
+    HoodieInstant instant2 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, commitTime2);
+    saveAsComplete(commitTimeline, instant2, Option.empty());
+
+    fsView.sync();
+
+    // invokes the stateless API first then the normal API, assert the result 
equality with different file slice objects
+    List<FileSlice> actual1 = 
fsView.getLatestFileSlicesStateless(partitionPath1).collect(Collectors.toList());
+    List<FileSlice> expected1 = 
fsView.getLatestFileSlices(partitionPath1).collect(Collectors.toList());
+    for (int i = 0; i < expected1.size(); i++) {
+      assertThat("The stateless API should return the same result", 
actual1.get(i), is(expected1.get(i)));
+      assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does 
not cache");
+    }
+
+    List<FileSlice> expected2 = 
fsView.getLatestFileSlicesStateless(partitionPath2).collect(Collectors.toList());
+    List<FileSlice> actual2 = 
fsView.getLatestFileSlices(partitionPath2).collect(Collectors.toList());
+    for (int i = 0; i < expected2.size(); i++) {
+      assertThat("The stateless API should return the same result", 
actual2.get(i), is(expected2.get(i)));
+      assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does 
not cache");
+    }
+  }
+
   @Test
   protected void testInvalidLogFiles() throws Exception {
     String partitionPath = "2016/05/01";
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 91adda4ee85..c72491341fe 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -337,6 +337,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
+      List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
@@ -421,6 +429,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1);
+      List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless(
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new 
ViewHandler(ctx -> {
       metricsRegistry.add("REFRESH_TABLE", 1);
       boolean success = sliceHandler
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index e8af55e69b3..c2b739c9f8b 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.timeline.service.handlers;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
 import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
@@ -30,6 +27,9 @@ import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.timeline.service.TimelineService;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -90,6 +90,11 @@ public class FileSliceHandler extends Handler {
         .collect(Collectors.toList());
   }
 
+  public List<FileSliceDTO> getLatestFileSlicesStateless(String basePath, 
String partitionPath) {
+    return 
viewManager.getFileSystemView(basePath).getLatestFileSlicesStateless(partitionPath).map(FileSliceDTO::fromFileSlice)
+        .collect(Collectors.toList());
+  }
+
   public List<FileSliceDTO> getLatestFileSlice(String basePath, String 
partitionPath, String fileId) {
     return 
viewManager.getFileSystemView(basePath).getLatestFileSlice(partitionPath, 
fileId)
         .map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(new 
ArrayList<>());
@@ -113,6 +118,12 @@ public class FileSliceHandler extends Handler {
     return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
   }
 
+  public List<FileGroupDTO> getAllFileGroupsStateless(String basePath, String 
partitionPath) {
+    List<HoodieFileGroup> fileGroups =  
viewManager.getFileSystemView(basePath).getAllFileGroupsStateless(partitionPath)
+        .collect(Collectors.toList());
+    return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
+  }
+
   public List<FileGroupDTO> getReplacedFileGroupsBeforeOrOn(String basePath, 
String maxCommitTime, String partitionPath) {
     List<HoodieFileGroup> fileGroups =  
viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime,
 partitionPath)
         .collect(Collectors.toList());


Reply via email to