This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 692d367f3ce [HUDI-6007] Add log files to savepoint metadata (#8364)
692d367f3ce is described below
commit 692d367f3ce30d1fe45823803602af228c378c1d
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 10 13:20:11 2023 +0800
[HUDI-6007] Add log files to savepoint metadata (#8364)
---
.../hudi/table/action/clean/CleanPlanner.java | 23 ++++++++--
.../action/savepoint/SavepointActionExecutor.java | 27 ++++++++---
.../java/org/apache/hudi/client/TestSavepoint.java | 34 +++++++++-----
.../table/view/AbstractTableFileSystemView.java | 19 ++++++++
.../table/view/PriorityBasedFileSystemView.java | 6 +++
.../view/RemoteHoodieTableFileSystemView.java | 20 +++++++++
.../common/table/view/TableFileSystemView.java | 8 ++++
.../hudi/procedure/TestSavepointsProcedure.scala | 52 ++++++++++++++++++++++
.../hudi/timeline/service/RequestHandler.java | 8 ++++
.../service/handlers/FileSliceHandler.java | 11 +++++
10 files changed, 188 insertions(+), 20 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 005e3f1dc34..518058700d3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -214,6 +215,21 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(),
config.getBasePath());
}
+ /**
+ * Verify whether file slice exists in savepointedFiles, check both base
file and log files
+ */
+ private boolean isFileSliceExistInSavepointedFiles(FileSlice fs,
List<String> savepointedFiles) {
+ if (fs.getBaseFile().isPresent() &&
savepointedFiles.contains(fs.getBaseFile().get().getFileName())) {
+ return true;
+ }
+ for (HoodieLogFile hoodieLogFile :
fs.getLogFiles().collect(Collectors.toList())) {
+ if (savepointedFiles.contains(hoodieLogFile.getFileName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Selects the older versions of files for cleaning, such that it bounds the
number of versions of each file. This
* policy is useful, if you are simply interested in querying the table, and
you don't want too many versions for a
@@ -253,8 +269,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
- Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
- if (dataFile.isPresent() &&
savepointedFiles.contains(dataFile.get().getFileName())) {
+ if (isFileSliceExistInSavepointedFiles(nextSlice, savepointedFiles)) {
// do not clean up a savepoint data file
continue;
}
@@ -325,7 +340,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
for (FileSlice aSlice : fileSliceList) {
Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
String fileCommitTime = aSlice.getBaseInstantTime();
- if (aFile.isPresent() &&
savepointedFiles.contains(aFile.get().getFileName())) {
+ if (isFileSliceExistInSavepointedFiles(aSlice, savepointedFiles)) {
// do not clean up a savepoint data file
continue;
}
@@ -419,7 +434,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure
corresponding replacecommit file is not deleted)
- .filter(slice -> !slice.getBaseFile().isPresent() ||
!savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
+ .filter(slice -> !isFileSliceExistInSavepointedFiles(slice,
savepointedFiles))
.flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
.collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 31288ed5554..5d780127b7f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -21,7 +21,7 @@ package org.apache.hudi.table.action.savepoint;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -96,7 +97,7 @@ public class SavepointActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
"Could not savepoint commit " + instantTime + " as this is beyond
the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
- TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+ TableFileSystemView.SliceView view = table.getSliceView();
Map<String, List<String>> latestFilesMap;
// NOTE: for performance, we have to use different logic here for
listing the latest files
@@ -108,17 +109,31 @@ public class SavepointActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
// each partition can be listed on the file system concurrently through
Spark.
// Note that
if (shouldUseBatchLookup(config)) {
- latestFilesMap =
view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+ latestFilesMap =
view.getAllLatestFileSlicesBeforeOrOn(instantTime).entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
- entry ->
entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+ entry -> {
+ List<String> latestFiles = new ArrayList<>();
+ entry.getValue().forEach(fileSlice -> {
+ if (fileSlice.getBaseFile().isPresent()) {
+
latestFiles.add(fileSlice.getBaseFile().get().getFileName());
+ }
+
latestFiles.addAll(fileSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+ });
+ return latestFiles;
+ }));
} else {
List<String> partitions = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), table.getMetaClient().getBasePath());
latestFilesMap = context.mapToPair(partitions, partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " +
partitionPath);
- List<String> latestFiles =
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+ List<String> latestFiles = new ArrayList<>();
+ view.getLatestFileSlicesBeforeOrOn(partitionPath, instantTime,
true).forEach(fileSlice -> {
+ if (fileSlice.getBaseFile().isPresent()) {
+ latestFiles.add(fileSlice.getBaseFile().get().getFileName());
+ }
+
latestFiles.addAll(fileSlice.getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList()));
+ });
return new ImmutablePair<>(partitionPath, latestFiles);
}, null);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
index 765c24d6a65..bea66a7fefd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -64,26 +65,39 @@ public class TestSavepoint extends HoodieClientTestBase {
private static Stream<Arguments> testSavepointParams() {
return Arrays.stream(new Object[][] {
- {true, MEMORY}, {true, EMBEDDED_KV_STORE},
- {false, MEMORY}, {false, EMBEDDED_KV_STORE}
+ {true, MEMORY, HoodieTableType.COPY_ON_WRITE}, {true,
EMBEDDED_KV_STORE, HoodieTableType.COPY_ON_WRITE},
+ {false, MEMORY, HoodieTableType.COPY_ON_WRITE}, {false,
EMBEDDED_KV_STORE, HoodieTableType.COPY_ON_WRITE},
+ {true, MEMORY, HoodieTableType.MERGE_ON_READ}, {true,
EMBEDDED_KV_STORE, HoodieTableType.MERGE_ON_READ},
+ {false, MEMORY, HoodieTableType.MERGE_ON_READ}, {false,
EMBEDDED_KV_STORE, HoodieTableType.MERGE_ON_READ}
}).map(Arguments::of);
}
@ParameterizedTest
@MethodSource("testSavepointParams")
public void testSavepoint(boolean enableMetadataTable,
- FileSystemViewStorageType storageType) throws
IOException {
+ FileSystemViewStorageType storageType,
+ HoodieTableType tableType) throws IOException {
HoodieWriteConfig cfg = getWriteConfig(enableMetadataTable, storageType);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+
+ initMetaClient(tableType);
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
- String newCommitTime = "001";
- client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
+ List<WriteStatus> statuses1 = client.upsert(writeRecords1,
commitTime1).collect();
+ assertNoWriteErrors(statuses1);
+
+ String commitTime2 = "002";
+ client.startCommitWithTime(commitTime2);
+ List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<WriteStatus> statuses2 = client.upsert(writeRecords2,
commitTime2).collect();
+ assertNoWriteErrors(statuses2);
- List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
- assertNoWriteErrors(statuses);
client.savepoint("user", "hoodie-savepoint-unit-test");
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -98,7 +112,7 @@ public class TestSavepoint extends HoodieClientTestBase {
HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline();
Map<String, List<HoodieWriteStat>> partitionToWriteStats =
HoodieCommitMetadata.fromBytes(
-
commitsTimeline.getInstantDetails(commitsTimeline.firstInstant().get()).get(),
+
commitsTimeline.getInstantDetails(commitsTimeline.lastInstant().get()).get(),
HoodieCommitMetadata.class)
.getPartitionToWriteStats();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 482255c45fb..1101ccd9825 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -850,6 +850,25 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
}
+ @Override
+ public final Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+ try {
+ readLock.lock();
+ List<String> formattedPartitionList =
ensureAllPartitionsLoadedCorrectly();
+ return formattedPartitionList.stream().collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath -> fetchAllStoredFileGroups(partitionPath)
+ .filter(slice ->
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+ .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime))
+ .map(sliceStream -> sliceStream.flatMap(slice ->
this.filterBaseFileAfterPendingCompaction(slice, false)))
+ .map(sliceStream ->
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+ .map(this::addBootstrapBaseFileIfPresent)
+ ));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionStr, String maxInstantTime) {
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index aac47f49219..a2a6b6f86ac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -190,6 +190,12 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
preferredView::getLatestFileSlicesBeforeOrOn,
secondaryView::getLatestFileSlicesBeforeOrOn);
}
+ @Override
+ public Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+ return execute(maxCommitTime,
preferredView::getAllLatestFileSlicesBeforeOrOn,
+ secondaryView::getAllLatestFileSlicesBeforeOrOn);
+ }
+
@Override
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime) {
return execute(partitionPath, maxInstantTime,
preferredView::getLatestMergedFileSlicesBeforeOrOn,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index c1772db6bfc..ae4b387dc76 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -74,6 +74,8 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public static final String LATEST_SLICES_RANGE_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/range/latest/");
public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
+ public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL =
+ String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
public static final String PENDING_COMPACTION_OPS = String.format("%s/%s",
BASE_URL, "compactions/pending/");
public static final String PENDING_LOG_COMPACTION_OPS =
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
@@ -356,6 +358,24 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Map<String, Stream<FileSlice>>
getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(BASEPATH_PARAM, basePath);
+ paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
+
+ try {
+ Map<String, List<FileSliceDTO>> fileSliceMap =
executeRequest(ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
+ new TypeReference<Map<String, List<FileSliceDTO>>>() {},
RequestMethod.GET);
+ return fileSliceMap.entrySet().stream().collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().stream().map(FileSliceDTO::toFileSlice)));
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 97b972a75d1..b2a8bb77841 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -127,6 +127,14 @@ public interface TableFileSystemView {
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime,
boolean includeFileSlicesInPendingCompaction);
+ /**
+ * Stream all latest file slices with precondition that commitTime(file)
before maxCommitTime.
+ *
+ * @param maxCommitTime Max Instant Time
+ * @return A {@link Map} of partition path to the latest file slices
before maxCommitTime.
+ */
+ Map<String, Stream<FileSlice>> getAllLatestFileSlicesBeforeOrOn(String
maxCommitTime);
+
/**
* Stream all "merged" file-slices before on an instant time If a
file-group has a pending compaction request, the
* file-slice before and after compaction request instant is merged and
returned.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
index 0aca7ce534a..004fbca6170 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
@@ -257,4 +257,56 @@ class TestSavepointsProcedure extends
HoodieSparkProcedureTestBase {
assertCached(spark.table(s"$tableName").select("id"), 0)
}
}
+
+ test("Test Savepoint with Log Only MOR Table") {
+ withRecordType()(withTempDir { tmp =>
+ // Create table with INMEMORY index to generate log only mor table.
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY',
+ | hoodie.compact.inline = 'false',
+ | hoodie.compact.inline.max.delta.commits = '1',
+ | hoodie.clean.automatic = 'false'
+ | )
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000L)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001L)")
+
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10, 1000L),
+ Seq(2, "a2", 10, 1001L)
+ )
+
+ // Create savepoint
+ val savePointTime = spark.sql(s"call show_commits(table =>
'$tableName')").sort("commit_time").collect().last.getString(0)
+ spark.sql(s"call create_savepoint('$tableName', '$savePointTime')")
+
+ // Run compaction
+ spark.sql(s"call run_compaction(table => '$tableName', op=> 'run')")
+
+ // Run clean
+ spark.sql(s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)")
+
+ // Rollback to savepoint
+ spark.sql(s"call rollback_to_savepoint('$tableName', '$savePointTime')")
+
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10, 1000L),
+ Seq(2, "a2", 10, 1001L)
+ )
+ })
+ }
}
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 9f18cc3453b..0c7154a6787 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -379,6 +379,14 @@ public class RequestHandler {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_LATEST_SLICES_BEFORE_ON_INSTANT", 1);
+ Map<String, List<FileSliceDTO>> dtos =
sliceHandler.getAllLatestFileSlicesBeforeOrOn(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new
ViewHandler(ctx -> {
metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 44e12855ac7..245fe31aaae 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -69,6 +70,16 @@ public class FileSliceHandler extends Handler {
.map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
}
+ public Map<String, List<FileSliceDTO>>
getAllLatestFileSlicesBeforeOrOn(String basePath, String maxInstantTime) {
+ return viewManager.getFileSystemView(basePath)
+ .getAllLatestFileSlicesBeforeOrOn(maxInstantTime)
+ .entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList())
+ ));
+ }
+
public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath,
String partitionPath) {
return
viewManager.getFileSystemView(basePath).getLatestUnCompactedFileSlices(partitionPath)
.map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());