This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 692d367f3ce [HUDI-6007] Add log files to savepoint metadata (#8364)
692d367f3ce is described below

commit 692d367f3ce30d1fe45823803602af228c378c1d
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 10 13:20:11 2023 +0800

    [HUDI-6007] Add log files to savepoint metadata (#8364)
---
 .../hudi/table/action/clean/CleanPlanner.java      | 23 ++++++++--
 .../action/savepoint/SavepointActionExecutor.java  | 27 ++++++++---
 .../java/org/apache/hudi/client/TestSavepoint.java | 34 +++++++++-----
 .../table/view/AbstractTableFileSystemView.java    | 19 ++++++++
 .../table/view/PriorityBasedFileSystemView.java    |  6 +++
 .../view/RemoteHoodieTableFileSystemView.java      | 20 +++++++++
 .../common/table/view/TableFileSystemView.java     |  8 ++++
 .../hudi/procedure/TestSavepointsProcedure.scala   | 52 ++++++++++++++++++++++
 .../hudi/timeline/service/RequestHandler.java      |  8 ++++
 .../service/handlers/FileSliceHandler.java         | 11 +++++
 10 files changed, 188 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 005e3f1dc34..518058700d3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -214,6 +215,21 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), 
config.getBasePath());
   }
 
+  /**
+   *  Verify whether file slice exists in savepointedFiles, check both base 
file and log files
+   */
+  private boolean isFileSliceExistInSavepointedFiles(FileSlice fs, 
List<String> savepointedFiles) {
+    if (fs.getBaseFile().isPresent() && 
savepointedFiles.contains(fs.getBaseFile().get().getFileName())) {
+      return true;
+    }
+    for (HoodieLogFile hoodieLogFile : 
fs.getLogFiles().collect(Collectors.toList())) {
+      if (savepointedFiles.contains(hoodieLogFile.getFileName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Selects the older versions of files for cleaning, such that it bounds the 
number of versions of each file. This
    * policy is useful, if you are simply interested in querying the table, and 
you don't want too many versions for a
@@ -253,8 +269,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
       // Delete the remaining files
       while (fileSliceIterator.hasNext()) {
         FileSlice nextSlice = fileSliceIterator.next();
-        Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
-        if (dataFile.isPresent() && 
savepointedFiles.contains(dataFile.get().getFileName())) {
+        if (isFileSliceExistInSavepointedFiles(nextSlice, savepointedFiles)) {
           // do not clean up a savepoint data file
           continue;
         }
@@ -325,7 +340,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
         for (FileSlice aSlice : fileSliceList) {
           Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
           String fileCommitTime = aSlice.getBaseInstantTime();
-          if (aFile.isPresent() && 
savepointedFiles.contains(aFile.get().getFileName())) {
+          if (isFileSliceExistInSavepointedFiles(aSlice, savepointedFiles)) {
             // do not clean up a savepoint data file
             continue;
           }
@@ -419,7 +434,7 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     }
     return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
         // do not delete savepointed files  (archival will make sure 
corresponding replacecommit file is not deleted)
-        .filter(slice -> !slice.getBaseFile().isPresent() || 
!savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
+        .filter(slice -> !isFileSliceExistInSavepointedFiles(slice, 
savepointedFiles))
         .flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
         .collect(Collectors.toList());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 31288ed5554..5d780127b7f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -21,7 +21,7 @@ package org.apache.hudi.table.action.savepoint;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -96,7 +97,7 @@ public class SavepointActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
           "Could not savepoint commit " + instantTime + " as this is beyond 
the lookup window " + lastCommitRetained);
 
       context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest 
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
-      TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+      TableFileSystemView.SliceView view = table.getSliceView();
 
       Map<String, List<String>> latestFilesMap;
       // NOTE: for performance, we have to use different logic here for 
listing the latest files
@@ -108,17 +109,31 @@ public class SavepointActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
       // each partition can be listed on the file system concurrently through 
Spark.
       // Note that
       if (shouldUseBatchLookup(config)) {
-        latestFilesMap = 
view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+        latestFilesMap = 
view.getAllLatestFileSlicesBeforeOrOn(instantTime).entrySet().stream()
             .collect(Collectors.toMap(
                 Map.Entry::getKey,
-                entry -> 
entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+                entry -> {
+                  List<String> latestFiles = new ArrayList<>();
+                  entry.getValue().forEach(fileSlice -> {
+                    if (fileSlice.getBaseFile().isPresent()) {
+                      
latestFiles.add(fileSlice.getBaseFile().get().getFileName());
+                    }
+                    
latestFiles.addAll(fileSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+                  });
+                  return latestFiles;
+                }));
       } else {
         List<String> partitions = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), table.getMetaClient().getBasePath());
         latestFilesMap = context.mapToPair(partitions, partitionPath -> {
           // Scan all partitions files with this commit time
           LOG.info("Collecting latest files in partition path " + 
partitionPath);
-          List<String> latestFiles = 
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
-              .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+          List<String> latestFiles = new ArrayList<>();
+          view.getLatestFileSlicesBeforeOrOn(partitionPath, instantTime, 
true).forEach(fileSlice -> {
+            if (fileSlice.getBaseFile().isPresent()) {
+              latestFiles.add(fileSlice.getBaseFile().get().getFileName());
+            }
+            
latestFiles.addAll(fileSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+          });
           return new ImmutablePair<>(partitionPath, latestFiles);
         }, null);
       }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
index 765c24d6a65..bea66a7fefd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -64,26 +65,39 @@ public class TestSavepoint extends HoodieClientTestBase {
 
   private static Stream<Arguments> testSavepointParams() {
     return Arrays.stream(new Object[][] {
-        {true, MEMORY}, {true, EMBEDDED_KV_STORE},
-        {false, MEMORY}, {false, EMBEDDED_KV_STORE}
+        {true, MEMORY, HoodieTableType.COPY_ON_WRITE}, {true, 
EMBEDDED_KV_STORE, HoodieTableType.COPY_ON_WRITE},
+        {false, MEMORY, HoodieTableType.COPY_ON_WRITE}, {false, 
EMBEDDED_KV_STORE, HoodieTableType.COPY_ON_WRITE},
+        {true, MEMORY, HoodieTableType.MERGE_ON_READ}, {true, 
EMBEDDED_KV_STORE, HoodieTableType.MERGE_ON_READ},
+        {false, MEMORY, HoodieTableType.MERGE_ON_READ}, {false, 
EMBEDDED_KV_STORE, HoodieTableType.MERGE_ON_READ}
     }).map(Arguments::of);
   }
 
   @ParameterizedTest
   @MethodSource("testSavepointParams")
   public void testSavepoint(boolean enableMetadataTable,
-                            FileSystemViewStorageType storageType) throws 
IOException {
+                            FileSystemViewStorageType storageType,
+                            HoodieTableType tableType) throws IOException {
     HoodieWriteConfig cfg = getWriteConfig(enableMetadataTable, storageType);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+    initMetaClient(tableType);
+
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
-      String newCommitTime = "001";
-      client.startCommitWithTime(newCommitTime);
 
-      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
-      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      String commitTime1 = "001";
+      client.startCommitWithTime(commitTime1);
+      List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+      JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+      List<WriteStatus> statuses1 = client.upsert(writeRecords1, 
commitTime1).collect();
+      assertNoWriteErrors(statuses1);
+
+      String commitTime2 = "002";
+      client.startCommitWithTime(commitTime2);
+      List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
+      JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+      List<WriteStatus> statuses2 = client.upsert(writeRecords2, 
commitTime2).collect();
+      assertNoWriteErrors(statuses2);
 
-      List<WriteStatus> statuses = client.upsert(writeRecords, 
newCommitTime).collect();
-      assertNoWriteErrors(statuses);
       client.savepoint("user", "hoodie-savepoint-unit-test");
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -98,7 +112,7 @@ public class TestSavepoint extends HoodieClientTestBase {
 
       HoodieTimeline commitsTimeline = 
table.getActiveTimeline().getCommitsTimeline();
       Map<String, List<HoodieWriteStat>> partitionToWriteStats = 
HoodieCommitMetadata.fromBytes(
-              
commitsTimeline.getInstantDetails(commitsTimeline.firstInstant().get()).get(),
+              
commitsTimeline.getInstantDetails(commitsTimeline.lastInstant().get()).get(),
               HoodieCommitMetadata.class)
           .getPartitionToWriteStats();
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 482255c45fb..1101ccd9825 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -850,6 +850,25 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     }
   }
 
+  @Override
+  public final Map<String, Stream<FileSlice>> 
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+    try {
+      readLock.lock();
+      List<String> formattedPartitionList = 
ensureAllPartitionsLoadedCorrectly();
+      return formattedPartitionList.stream().collect(Collectors.toMap(
+          Function.identity(),
+          partitionPath -> fetchAllStoredFileGroups(partitionPath)
+              .filter(slice -> 
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+              .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime))
+              .map(sliceStream -> sliceStream.flatMap(slice -> 
this.filterBaseFileAfterPendingCompaction(slice, false)))
+              .map(sliceStream -> 
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+              .map(this::addBootstrapBaseFileIfPresent)
+      ));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionStr, String maxInstantTime) {
     try {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index aac47f49219..a2a6b6f86ac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -190,6 +190,12 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
         preferredView::getLatestFileSlicesBeforeOrOn, 
secondaryView::getLatestFileSlicesBeforeOrOn);
   }
 
+  @Override
+  public Map<String, Stream<FileSlice>> 
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+    return execute(maxCommitTime, 
preferredView::getAllLatestFileSlicesBeforeOrOn,
+        secondaryView::getAllLatestFileSlicesBeforeOrOn);
+  }
+
   @Override
   public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime) {
     return execute(partitionPath, maxInstantTime, 
preferredView::getLatestMergedFileSlicesBeforeOrOn,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index c1772db6bfc..ae4b387dc76 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -74,6 +74,8 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public static final String LATEST_SLICES_RANGE_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "slices/range/latest/");
   public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL =
       String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
+  public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL =
+      String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
 
   public static final String PENDING_COMPACTION_OPS = String.format("%s/%s", 
BASE_URL, "compactions/pending/");
   public static final String PENDING_LOG_COMPACTION_OPS = 
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
@@ -356,6 +358,24 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Map<String, Stream<FileSlice>> 
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
+
+    try {
+      Map<String, List<FileSliceDTO>> fileSliceMap = 
executeRequest(ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
+          new TypeReference<Map<String, List<FileSliceDTO>>>() {}, 
RequestMethod.GET);
+      return fileSliceMap.entrySet().stream().collect(
+          Collectors.toMap(
+              Map.Entry::getKey,
+              entry -> 
entry.getValue().stream().map(FileSliceDTO::toFileSlice)));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 97b972a75d1..b2a8bb77841 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -127,6 +127,14 @@ public interface TableFileSystemView {
     Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, 
String maxCommitTime,
         boolean includeFileSlicesInPendingCompaction);
 
+    /**
+     * Stream all latest file slices with precondition that commitTime(file) 
before maxCommitTime.
+     *
+     * @param maxCommitTime Max Instant Time
+     * @return A {@link Map} of partition path to the latest file slices 
before maxCommitTime.
+     */
+    Map<String, Stream<FileSlice>> getAllLatestFileSlicesBeforeOrOn(String 
maxCommitTime);
+
     /**
      * Stream all "merged" file-slices before on an instant time If a 
file-group has a pending compaction request, the
      * file-slice before and after compaction request instant is merged and 
returned.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
index 0aca7ce534a..004fbca6170 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
@@ -257,4 +257,56 @@ class TestSavepointsProcedure extends 
HoodieSparkProcedureTestBase {
       assertCached(spark.table(s"$tableName").select("id"), 0)
     }
   }
+
+  test("Test Savepoint with Log Only MOR Table") {
+    withRecordType()(withTempDir { tmp =>
+      // Create table with INMEMORY index to generate log only mor table.
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.index.type = 'INMEMORY',
+           |  hoodie.compact.inline = 'false',
+           |  hoodie.compact.inline.max.delta.commits = '1',
+           |  hoodie.clean.automatic = 'false'
+           | )
+       """.stripMargin)
+
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000L)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001L)")
+
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10, 1000L),
+        Seq(2, "a2", 10, 1001L)
+      )
+
+      // Create savepoint
+      val savePointTime = spark.sql(s"call show_commits(table => 
'$tableName')").sort("commit_time").collect().last.getString(0)
+      spark.sql(s"call create_savepoint('$tableName', '$savePointTime')")
+
+      // Run compaction
+      spark.sql(s"call run_compaction(table => '$tableName', op=> 'run')")
+
+      // Run clean
+      spark.sql(s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)")
+
+      // Rollback to savepoint
+      spark.sql(s"call rollback_to_savepoint('$tableName', '$savePointTime')")
+
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10, 1000L),
+        Seq(2, "a2", 10, 1001L)
+      )
+    })
+  }
 }
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 9f18cc3453b..0c7154a6787 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -379,6 +379,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_LATEST_SLICES_BEFORE_ON_INSTANT", 1);
+      Map<String, List<FileSliceDTO>> dtos = 
sliceHandler.getAllLatestFileSlicesBeforeOrOn(
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new 
ViewHandler(ctx -> {
       metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
       List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 44e12855ac7..245fe31aaae 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -69,6 +70,16 @@ public class FileSliceHandler extends Handler {
         .map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
   }
 
+  public Map<String, List<FileSliceDTO>> 
getAllLatestFileSlicesBeforeOrOn(String basePath, String maxInstantTime) {
+    return viewManager.getFileSystemView(basePath)
+        .getAllLatestFileSlicesBeforeOrOn(maxInstantTime)
+        .entrySet().stream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> 
entry.getValue().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList())
+        ));
+  }
+
   public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath, 
String partitionPath) {
     return 
viewManager.getFileSystemView(basePath).getLatestUnCompactedFileSlices(partitionPath)
         .map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());

Reply via email to