This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 499423c85cd [HUDI-7041] Optimize the memory usage of timeline server
for table service (#10002)
499423c85cd is described below
commit 499423c85cda6e9ccf5cec405732517b0ea39355
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Sun Nov 26 10:13:46 2023 +0800
[HUDI-7041] Optimize the memory usage of timeline server for table service
(#10002)
---
.../action/clean/CleanPlanActionExecutor.java | 30 +++--
.../hudi/table/action/clean/CleanPlanner.java | 4 +-
.../cluster/strategy/ClusteringPlanStrategy.java | 2 +-
.../BaseHoodieCompactionPlanGenerator.java | 2 +-
.../table/view/AbstractTableFileSystemView.java | 133 ++++++++++++++++++---
.../table/view/HoodieTableFileSystemView.java | 5 +
.../table/view/PriorityBasedFileSystemView.java | 10 ++
.../view/RemoteHoodieTableFileSystemView.java | 28 +++++
.../table/view/RocksDbBasedFileSystemView.java | 6 +
.../common/table/view/TableFileSystemView.java | 25 ++++
.../hudi/common/util/RocksDBSchemaHelper.java | 4 +
.../table/view/TestHoodieTableFileSystemView.java | 106 +++++++++++++++-
.../hudi/timeline/service/RequestHandler.java | 16 +++
.../service/handlers/FileSliceHandler.java | 17 ++-
14 files changed, 351 insertions(+), 37 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 3b5d1233214..a70bfd256c0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -41,7 +41,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -118,17 +120,23 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
context.setJobStatus(this.getClass().getSimpleName(), "Generating list
of file slices to be cleaned: " + config.getTableName());
- Map<String, Pair<Boolean, List<CleanFileInfo>>>
cleanOpsWithPartitionMeta = context
- .map(partitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean,
earliestInstant)), cleanerParallelism)
- .stream()
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- Map<String, List<HoodieCleanFileInfo>> cleanOps =
cleanOpsWithPartitionMeta.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e ->
CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));
-
- List<String> partitionsToDelete =
cleanOpsWithPartitionMeta.entrySet().stream().filter(entry ->
entry.getValue().getKey()).map(Map.Entry::getKey)
- .collect(Collectors.toList());
+ Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
+ List<String> partitionsToDelete = new ArrayList<>();
+ for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
+ // Handles at most 'cleanerParallelism' number of partitions once at a
time to avoid overlarge memory pressure to the timeline server
+ // (remote or local embedded), thus to reduce the risk of an OOM
exception.
+ List<String> subPartitionsToClean = partitionsToClean.subList(i,
Math.min(i + cleanerParallelism, partitionsToClean.size()));
+ Map<String, Pair<Boolean, List<CleanFileInfo>>>
cleanOpsWithPartitionMeta = context
+ .map(subPartitionsToClean, partitionPathToClean ->
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean,
earliestInstant)), cleanerParallelism)
+ .stream()
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))));
+
+
partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry
-> entry.getValue().getKey()).map(Map.Entry::getKey)
+ .collect(Collectors.toList()));
+ }
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(),
x.getState().name())).orElse(null),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 112034fd877..efbca863e50 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -254,7 +254,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, Option.empty()));
boolean toDeletePartition = false;
- List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+ List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
@@ -329,7 +329,7 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
// all replaced file groups before earliestCommitToRetain are eligible
to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, earliestCommitToRetain));
// add active files
- List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+ List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList =
fileGroup.getAllFileSlices().collect(Collectors.toList());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 2d2c2a36643..0d07bed531a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -121,7 +121,7 @@ public abstract class ClusteringPlanStrategy<T,I,K,O>
implements Serializable {
.collect(Collectors.toSet());
fgIdsInPendingCompactionLogCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));
- return hoodieTable.getSliceView().getLatestFileSlices(partition)
+ return hoodieTable.getSliceView().getLatestFileSlicesStateless(partition)
// file ids already in clustering are not eligible
.filter(slice ->
!fgIdsInPendingCompactionLogCompactionAndClustering.contains(slice.getFileGroupId()));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 0f3beb136b2..2626bc59918 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -117,7 +117,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
List<HoodieCompactionOperation> operations =
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
- .getLatestFileSlices(partitionPath)
+ .getLatestFileSlicesStateless(partitionPath)
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime,
fgIdsInPendingCompactionAndClustering, instantRange))
.map(s -> {
List<HoodieLogFile> logFiles = s.getLogFiles()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index b3dc0fbce0a..14f55b76889 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -433,6 +433,19 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return fileStatusMap;
}
+ /**
+ * Returns all files situated at the given partition.
+ */
+ private FileStatus[] getAllFilesInPartition(String relativePartitionPath)
throws IOException {
+ Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(),
relativePartitionPath);
+ long beginLsTs = System.currentTimeMillis();
+ FileStatus[] statuses = listPartition(partitionPath);
+ long endLsTs = System.currentTimeMillis();
+ LOG.debug("#files found in partition (" + relativePartitionPath + ") =" +
statuses.length + ", Time taken ="
+ + (endLsTs - beginLsTs));
+ return statuses;
+ }
+
/**
* Allows lazily loading the partitions if needed.
*
@@ -449,15 +462,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
// Not loaded yet
try {
LOG.info("Building file system view for partition (" +
partitionPathStr + ")");
-
- Path partitionPath =
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
- long beginLsTs = System.currentTimeMillis();
- FileStatus[] statuses = listPartition(partitionPath);
- long endLsTs = System.currentTimeMillis();
- LOG.debug("#files found in partition (" + partitionPathStr + ") =" +
statuses.length + ", Time taken ="
- + (endLsTs - beginLsTs));
- List<HoodieFileGroup> groups = addFilesToView(statuses);
-
+ List<HoodieFileGroup> groups =
addFilesToView(getAllFilesInPartition(partitionPathStr));
if (groups.isEmpty()) {
storePartitionView(partitionPathStr, new ArrayList<>());
}
@@ -598,24 +603,32 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup
fileGroup) {
+ return addBootstrapBaseFileIfPresent(fileGroup,
this::getBootstrapBaseFile);
+ }
+
+ protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup
fileGroup, Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>>
bootstrapBaseFileMappingFunc) {
boolean hasBootstrapBaseFile = fileGroup.getAllFileSlices()
.anyMatch(fs ->
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS));
if (hasBootstrapBaseFile) {
HoodieFileGroup newFileGroup = new HoodieFileGroup(fileGroup);
newFileGroup.getAllFileSlices().filter(fs ->
fs.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS))
.forEach(fs -> fs.setBaseFile(
- addBootstrapBaseFileIfPresent(fs.getFileGroupId(),
fs.getBaseFile().get())));
+ addBootstrapBaseFileIfPresent(fs.getFileGroupId(),
fs.getBaseFile().get(), bootstrapBaseFileMappingFunc)));
return newFileGroup;
}
return fileGroup;
}
protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice) {
+ return addBootstrapBaseFileIfPresent(fileSlice,
this::getBootstrapBaseFile);
+ }
+
+ protected FileSlice addBootstrapBaseFileIfPresent(FileSlice fileSlice,
Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>>
bootstrapBaseFileMappingFunc) {
if (fileSlice.getBaseInstantTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
FileSlice copy = new FileSlice(fileSlice);
copy.getBaseFile().ifPresent(dataFile -> {
Option<BootstrapBaseFileMapping> edf =
getBootstrapBaseFile(copy.getFileGroupId());
- edf.ifPresent(e ->
dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
+ bootstrapBaseFileMappingFunc.apply(copy.getFileGroupId()).ifPresent(e
-> dataFile.setBootstrapBaseFile(e.getBootstrapBaseFile()));
});
return copy;
}
@@ -623,10 +636,16 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
protected HoodieBaseFile addBootstrapBaseFileIfPresent(HoodieFileGroupId
fileGroupId, HoodieBaseFile baseFile) {
+ return addBootstrapBaseFileIfPresent(fileGroupId, baseFile,
this::getBootstrapBaseFile);
+ }
+
+ protected HoodieBaseFile addBootstrapBaseFileIfPresent(
+ HoodieFileGroupId fileGroupId,
+ HoodieBaseFile baseFile,
+ Function<HoodieFileGroupId, Option<BootstrapBaseFileMapping>>
bootstrapBaseFileMappingFunc) {
if (baseFile.getCommitTime().equals(METADATA_BOOTSTRAP_INSTANT_TS)) {
HoodieBaseFile copy = new HoodieBaseFile(baseFile);
- Option<BootstrapBaseFileMapping> edf = getBootstrapBaseFile(fileGroupId);
- edf.ifPresent(e -> copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
+ bootstrapBaseFileMappingFunc.apply(fileGroupId).ifPresent(e ->
copy.setBootstrapBaseFile(e.getBootstrapBaseFile()));
return copy;
}
return baseFile;
@@ -706,7 +725,6 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
public final Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
try {
readLock.lock();
-
List<String> formattedPartitionList =
ensureAllPartitionsLoadedCorrectly();
return formattedPartitionList.stream().collect(Collectors.toMap(
Function.identity(),
@@ -824,6 +842,31 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
}
+ @Override
+ public final Stream<FileSlice> getLatestFileSlicesStateless(String
partitionStr) {
+ String partition = formatPartitionKey(partitionStr);
+ if (isPartitionAvailableInStore(partition)) {
+ return getLatestFileSlices(partition);
+ } else {
+ try {
+ Stream<FileSlice> fileSliceStream =
buildFileGroups(getAllFilesInPartition(partition),
visibleCommitsAndCompactionTimeline, true).stream()
+ .filter(fg -> !isFileGroupReplaced(fg))
+ .map(HoodieFileGroup::getLatestFileSlice)
+ .filter(Option::isPresent).map(Option::get)
+ .flatMap(slice -> this.filterUncommittedFiles(slice, true));
+ if (bootstrapIndex.useIndex()) {
+ final Map<HoodieFileGroupId, BootstrapBaseFileMapping>
bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
+ if (!bootstrapBaseFileMappings.isEmpty()) {
+ return fileSliceStream.map(fileSlice ->
addBootstrapBaseFileIfPresent(fileSlice, fileGroupId ->
Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
+ }
+ }
+ return fileSliceStream;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to fetch all files in partition "
+ partition, e);
+ }
+ }
+ }
+
/**
* Get Latest File Slice for a given fileId in a given partition.
*/
@@ -1014,6 +1057,39 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg ->
!isFileGroupReplaced(fg));
}
+ @Override
+ public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String
partitionStr) {
+ String partition = formatPartitionKey(partitionStr);
+ if (isPartitionAvailableInStore(partition)) {
+ return getAllFileGroups(partition);
+ } else {
+ try {
+ Stream<HoodieFileGroup> fileGroupStream =
buildFileGroups(getAllFilesInPartition(partition),
visibleCommitsAndCompactionTimeline, true).stream()
+ .filter(fg -> !isFileGroupReplaced(fg));
+ if (bootstrapIndex.useIndex()) {
+ final Map<HoodieFileGroupId, BootstrapBaseFileMapping>
bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
+ if (!bootstrapBaseFileMappings.isEmpty()) {
+ return fileGroupStream.map(fileGroup ->
addBootstrapBaseFileIfPresent(fileGroup, fileGroupId ->
Option.ofNullable(bootstrapBaseFileMappings.get(fileGroupId))));
+ }
+ }
+ return fileGroupStream;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to fetch all files in partition "
+ partition, e);
+ }
+ }
+ }
+
+ private Map<HoodieFileGroupId, BootstrapBaseFileMapping>
getBootstrapBaseFileMappings(String partition) {
+ try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
+ LOG.info("Bootstrap Index available for partition " + partition);
+ List<BootstrapFileMapping> sourceFileMappings =
+ reader.getSourceFileMappingForPartition(partition);
+ return sourceFileMappings.stream()
+ .map(s -> new BootstrapBaseFileMapping(new
HoodieFileGroupId(s.getPartitionPath(),
+ s.getFileId()),
s.getBootstrapFileStatus())).collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId,
s -> s));
+ }
+ }
+
private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final
String partitionStr) {
try {
readLock.lock();
@@ -1029,22 +1105,38 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath) {
- return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
+ String partition = formatPartitionKey(partitionPath);
+ if (hasReplacedFilesInPartition(partition)) {
+ return getAllFileGroupsIncludingReplaced(partition).filter(fg ->
isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
+ }
+ return Stream.empty();
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
- return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+ String partition = formatPartitionKey(partitionPath);
+ if (hasReplacedFilesInPartition(partition)) {
+ return getAllFileGroupsIncludingReplaced(partition).filter(fg ->
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+ }
+ return Stream.empty();
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String
minCommitTime, String partitionPath) {
- return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
+ String partition = formatPartitionKey(partitionPath);
+ if (hasReplacedFilesInPartition(partition)) {
+ return getAllFileGroupsIncludingReplaced(partition).filter(fg ->
isFileGroupReplacedAfterOrOn(fg.getFileGroupId(), minCommitTime));
+ }
+ return Stream.empty();
}
@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
- return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplaced(fg.getFileGroupId()));
+ String partition = formatPartitionKey(partitionPath);
+ if (hasReplacedFilesInPartition(partition)) {
+ return getAllFileGroupsIncludingReplaced(partition).filter(fg ->
isFileGroupReplaced(fg.getFileGroupId()));
+ }
+ return Stream.empty();
}
@Override
@@ -1263,6 +1355,11 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
*/
protected abstract void removeReplacedFileIdsAtInstants(Set<String>
instants);
+ /**
+ * Returns whether there are replaced files within the given partition.
+ */
+ protected abstract boolean hasReplacedFilesInPartition(String partitionPath);
+
/**
* Track instant time for file groups replaced.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index bb98c97e28d..f1b56ebe519 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -408,6 +408,11 @@ public class HoodieTableFileSystemView extends
IncrementalTimelineSyncFileSystem
fgIdToReplaceInstants.entrySet().removeIf(entry ->
instants.contains(entry.getValue().getTimestamp()));
}
+ @Override
+ protected boolean hasReplacedFilesInPartition(String partitionPath) {
+ return fgIdToReplaceInstants.keySet().stream().anyMatch(fg ->
fg.getPartitionPath().equals(partitionPath));
+ }
+
@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId
fileGroupId) {
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index e30b9f425d2..56d7c7cc25c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -182,6 +182,11 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
return execute(partitionPath, preferredView::getLatestFileSlices,
secondaryView::getLatestFileSlices);
}
+ @Override
+ public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
+ return execute(partitionPath, preferredView::getLatestFileSlicesStateless,
secondaryView::getLatestFileSlicesStateless);
+ }
+
@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String
partitionPath) {
return execute(partitionPath,
preferredView::getLatestUnCompactedFileSlices,
@@ -222,6 +227,11 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
return execute(partitionPath, preferredView::getAllFileGroups,
secondaryView::getAllFileGroups);
}
+ @Override
+ public Stream<HoodieFileGroup> getAllFileGroupsStateless(String
partitionPath) {
+ return execute(partitionPath, preferredView::getAllFileGroupsStateless,
secondaryView::getAllFileGroupsStateless);
+ }
+
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBeforeOrOn,
secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index a6318608bcf..4363a7daf27 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
private static final String BASE_URL = "/v1/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
+ public static final String LATEST_PARTITION_SLICES_STATELESS_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
public static final String LATEST_PARTITION_SLICE_URL =
String.format("%s/%s", BASE_URL, "slices/file/latest/");
public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
@@ -101,6 +102,9 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
+ public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
+ String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
+
public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
@@ -332,6 +336,18 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
+ Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ try {
+ List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap,
+ new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
+ return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Option<FileSlice> getLatestFileSlice(String partitionPath, String
fileId) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
@@ -438,6 +454,18 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<HoodieFileGroup> getAllFileGroupsStateless(String
partitionPath) {
+ Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ try {
+ List<FileGroupDTO> fileGroups =
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap,
+ new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+ return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index 26795656383..b2b05e32481 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -553,6 +553,12 @@ public class RocksDbBasedFileSystemView extends
IncrementalTimelineSyncFileSyste
);
}
+ @Override
+ protected boolean hasReplacedFilesInPartition(String partitionPath) {
+ return
rocksDB.<HoodieInstant>prefixSearch(schemaHelper.getColFamilyForReplacedFileGroups(),
schemaHelper.getPrefixForReplacedFileGroup(partitionPath))
+ .findAny().isPresent();
+ }
+
@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId
fileGroupId) {
String lookupKey = schemaHelper.getKeyForReplacedFileGroup(fileGroupId);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 6fedb8684c9..1bcd1de61bc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -107,6 +107,19 @@ public interface TableFileSystemView {
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
+ /**
+ * Stream all the latest file slices in the given partition
+ * without caching the file group mappings.
+ *
+ * <p>This is useful for some table services such as compaction and
clustering, these services may search around the files to clean
+ * within some ancient data partitions, if there triggers a full table
service for enormous number of partitions, the cache could
+ * cause a huge memory pressure to the timeline server which induces an
OOM exception.
+ *
+ * <p>The caching of these file groups does not benefit to writers most
often because the writers
+ * write to recent data partitions usually.
+ */
+ Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath);
+
/**
* Get Latest File Slice for a given fileId in a given partition.
*/
@@ -168,6 +181,18 @@ public interface TableFileSystemView {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
+ /**
+ * Stream all the file groups for a given partition without caching the file
group mappings.
+ *
+ * <p>This is useful for some table services such as cleaning, the cleaning
service may search around the files to clean
+ * within some ancient data partitions, if there triggers a full table
cleaning for enormous number of partitions, the cache could
+ * cause a huge memory pressure to the timeline server which induces an OOM
exception.
+ *
+ * <p>The caching of these file groups does not benefit to writers most
often because the writers
+ * write to recent data partitions usually.
+ */
+ Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath);
+
/**
* Return Pending Compaction Operations.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
index 45b2a13eb72..ff924e45013 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
@@ -87,6 +87,10 @@ public class RocksDBSchemaHelper {
return getPartitionFileIdBasedLookup(fgId);
}
+ public String getPrefixForReplacedFileGroup(String partitionPath) {
+ return String.format("part=%s,id=", partitionPath);
+ }
+
public String getKeyForFileGroupsInPendingClustering(HoodieFileGroupId fgId)
{
return getPartitionFileIdBasedLookup(fgId);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index ab8a1fd3aa2..558dc59f40c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -90,11 +90,12 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -328,6 +329,109 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
assertEquals(2, fsView.getAllFileGroups(partitionPath).count());
}
+ @Test
+ public void testViewForGetAllFileGroupsStateless() throws Exception {
+ String partitionPath1 = "2023/11/22";
+ new File(basePath + "/" + partitionPath1).mkdirs();
+ new File(basePath + "/" + partitionPath1 + "/" +
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+ String partitionPath2 = "2023/11/23";
+ new File(basePath + "/" + partitionPath2).mkdirs();
+ new File(basePath + "/" + partitionPath2 + "/" +
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+
+ // create 2 fileId in partition1
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String commitTime1 = "1";
+ String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN,
fileId1);
+ String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN,
fileId2);
+ new File(basePath + "/" + partitionPath1 + "/" +
fileName1).createNewFile();
+ new File(basePath + "/" + partitionPath1 + "/" +
fileName2).createNewFile();
+
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, commitTime1);
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+
+ // create 2 fileId in partition2
+ String fileId3 = UUID.randomUUID().toString();
+ String fileId4 = UUID.randomUUID().toString();
+ String commitTime2 = "2";
+ String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN,
fileId3);
+ String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN,
fileId4);
+ new File(basePath + "/" + partitionPath2 + "/" +
fileName3).createNewFile();
+ new File(basePath + "/" + partitionPath2 + "/" +
fileName4).createNewFile();
+
+ HoodieInstant instant2 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, commitTime2);
+ saveAsComplete(commitTimeline, instant2, Option.empty());
+
+ fsView.sync();
+ // invokes the stateless API first then the normal API, assert the result
equality with different file group objects
+ List<HoodieFileGroup> actual1 =
fsView.getAllFileGroupsStateless(partitionPath1).collect(Collectors.toList());
+ List<HoodieFileGroup> expected1 =
fsView.getAllFileGroups(partitionPath1).collect(Collectors.toList());
+ for (int i = 0; i < expected1.size(); i++) {
+ assertThat("The stateless API should return the same result",
actual1.get(i).toString(), is(expected1.get(i).toString()));
+ assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does
not cache");
+ }
+
+ List<HoodieFileGroup> expected2 =
fsView.getAllFileGroupsStateless(partitionPath2).collect(Collectors.toList());
+ List<HoodieFileGroup> actual2 =
fsView.getAllFileGroups(partitionPath2).collect(Collectors.toList());
+ for (int i = 0; i < expected2.size(); i++) {
+ assertThat("The stateless API should return the same result",
actual2.get(i).toString(), is(expected2.get(i).toString()));
+ assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does
not cache");
+ }
+ }
+
+ @Test
+ public void testViewForGetLatestFileSlicesStateless() throws Exception {
+ String partitionPath1 = "2023/11/22";
+ new File(basePath + "/" + partitionPath1).mkdirs();
+ new File(basePath + "/" + partitionPath1 + "/" +
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+ String partitionPath2 = "2023/11/23";
+ new File(basePath + "/" + partitionPath2).mkdirs();
+ new File(basePath + "/" + partitionPath2 + "/" +
HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
+
+ // create 2 fileId in partition1
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String commitTime1 = "1";
+ String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN,
fileId1);
+ String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN,
fileId2);
+ new File(basePath + "/" + partitionPath1 + "/" +
fileName1).createNewFile();
+ new File(basePath + "/" + partitionPath1 + "/" +
fileName2).createNewFile();
+
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, commitTime1);
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+
+ // create 2 fileId in partition2
+ String fileId3 = UUID.randomUUID().toString();
+ String fileId4 = UUID.randomUUID().toString();
+ String commitTime2 = "2";
+ String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN,
fileId3);
+ String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN,
fileId4);
+ new File(basePath + "/" + partitionPath2 + "/" +
fileName3).createNewFile();
+ new File(basePath + "/" + partitionPath2 + "/" +
fileName4).createNewFile();
+
+ HoodieInstant instant2 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, commitTime2);
+ saveAsComplete(commitTimeline, instant2, Option.empty());
+
+ fsView.sync();
+
+ // invokes the stateless API first then the normal API, assert the result
equality with different file slice objects
+ List<FileSlice> actual1 =
fsView.getLatestFileSlicesStateless(partitionPath1).collect(Collectors.toList());
+ List<FileSlice> expected1 =
fsView.getLatestFileSlices(partitionPath1).collect(Collectors.toList());
+ for (int i = 0; i < expected1.size(); i++) {
+ assertThat("The stateless API should return the same result",
actual1.get(i), is(expected1.get(i)));
+ assertNotSame(actual1.get(i), expected1.get(i), "The stateless API does
not cache");
+ }
+
+ List<FileSlice> expected2 =
fsView.getLatestFileSlicesStateless(partitionPath2).collect(Collectors.toList());
+ List<FileSlice> actual2 =
fsView.getLatestFileSlices(partitionPath2).collect(Collectors.toList());
+ for (int i = 0; i < expected2.size(); i++) {
+ assertThat("The stateless API should return the same result",
actual2.get(i), is(expected2.get(i)));
+ assertNotSame(actual2.get(i), expected2.get(i), "The stateless API does
not cache");
+ }
+ }
+
@Test
protected void testInvalidLogFiles() throws Exception {
String partitionPath = "2016/05/01";
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 91adda4ee85..c72491341fe 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -337,6 +337,14 @@ public class RequestHandler {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
+ List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
@@ -421,6 +429,14 @@ public class RequestHandler {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1);
+ List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new
ViewHandler(ctx -> {
metricsRegistry.add("REFRESH_TABLE", 1);
boolean success = sliceHandler
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index e8af55e69b3..c2b739c9f8b 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -18,9 +18,6 @@
package org.apache.hudi.timeline.service.handlers;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
@@ -30,6 +27,9 @@ import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.timeline.service.TimelineService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -90,6 +90,11 @@ public class FileSliceHandler extends Handler {
.collect(Collectors.toList());
}
+ public List<FileSliceDTO> getLatestFileSlicesStateless(String basePath,
String partitionPath) {
+ return
viewManager.getFileSystemView(basePath).getLatestFileSlicesStateless(partitionPath).map(FileSliceDTO::fromFileSlice)
+ .collect(Collectors.toList());
+ }
+
public List<FileSliceDTO> getLatestFileSlice(String basePath, String
partitionPath, String fileId) {
return
viewManager.getFileSystemView(basePath).getLatestFileSlice(partitionPath,
fileId)
.map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(new
ArrayList<>());
@@ -113,6 +118,12 @@ public class FileSliceHandler extends Handler {
return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
}
+ public List<FileGroupDTO> getAllFileGroupsStateless(String basePath, String
partitionPath) {
+ List<HoodieFileGroup> fileGroups =
viewManager.getFileSystemView(basePath).getAllFileGroupsStateless(partitionPath)
+ .collect(Collectors.toList());
+ return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
+ }
+
public List<FileGroupDTO> getReplacedFileGroupsBeforeOrOn(String basePath,
String maxCommitTime, String partitionPath) {
List<HoodieFileGroup> fileGroups =
viewManager.getFileSystemView(basePath).getReplacedFileGroupsBeforeOrOn(maxCommitTime,
partitionPath)
.collect(Collectors.toList());