This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 45da30dc3ec [HUDI-5485] Add File System View API for batch listing and
improve savepoint performance with metadata table (#7690)
45da30dc3ec is described below
commit 45da30dc3ecfd4e7315d8f33d95504a5ac7cbd1a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 25 23:39:46 2023 -0800
[HUDI-5485] Add File System View API for batch listing and improve
savepoint performance with metadata table (#7690)
The savepoint action needs to list all the latest base files before or on
the target instant time. When the metadata table is enabled, for each
partition, the metadata table is scanned from the Spark executor, which leads
to a lot of S3 requests. This significantly degrades the savepoint performance
when the number of table partitions is huge. Such repeated scanning of the
metadata table is unnecessary and can be avoided by scanning the metadata table
once.
This commit fixes the issue by adding a new file system view API that
batches the file listing of all partitions so that only one metadata table
lookup call (`tableMetadata.getAllFilesInPartitions`) is invoked on the driver
side for savepoint action.
---
.../action/savepoint/SavepointActionExecutor.java | 55 +++++-
.../java/org/apache/hudi/client/TestSavepoint.java | 152 +++++++++++++++++
.../hudi/testutils/HoodieClientTestBase.java | 2 +-
.../table/view/AbstractTableFileSystemView.java | 190 +++++++++++++++++----
.../table/view/PriorityBasedFileSystemView.java | 7 +
.../view/RemoteHoodieTableFileSystemView.java | 45 ++++-
.../common/table/view/TableFileSystemView.java | 11 ++
.../metadata/HoodieMetadataFileSystemView.java | 25 +++
.../hudi/timeline/service/RequestHandler.java | 9 +
.../timeline/service/handlers/BaseFileHandler.java | 11 ++
10 files changed, 458 insertions(+), 49 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 3bfdd20721e..4e0ae1da223 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -84,15 +85,33 @@ public class SavepointActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
"Could not savepoint commit " + instantTime + " as this is beyond
the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
- List<String> partitions = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), table.getMetaClient().getBasePath());
- Map<String, List<String>> latestFilesMap = context.mapToPair(partitions,
partitionPath -> {
- // Scan all partitions files with this commit time
- LOG.info("Collecting latest files in partition path " + partitionPath);
- TableFileSystemView.BaseFileOnlyView view =
table.getBaseFileOnlyView();
- List<String> latestFiles =
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new ImmutablePair<>(partitionPath, latestFiles);
- }, null);
+ TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+ Map<String, List<String>> latestFilesMap;
+ // NOTE: for performance, we have to use different logic here for
listing the latest files
+ // before or on the given instant:
+ // (1) using metadata-table-based file listing: instead of parallelizing
the partition
+ // listing which incurs unnecessary metadata table reads, we directly
read the metadata
+ // table once in a batch manner through the timeline server;
+ // (2) using direct file system listing: we parallelize the partition
listing so that
+ // each partition can be listed on the file system concurrently through
Spark.
+ // Note that
+ if (shouldUseBatchLookup(config)) {
+ latestFilesMap =
view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+ } else {
+ List<String> partitions = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), table.getMetaClient().getBasePath());
+ latestFilesMap = context.mapToPair(partitions, partitionPath -> {
+ // Scan all partitions files with this commit time
+ LOG.info("Collecting latest files in partition path " +
partitionPath);
+ List<String> latestFiles =
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+ .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+ return new ImmutablePair<>(partitionPath, latestFiles);
+ }, null);
+ }
+
HoodieSavepointMetadata metadata =
TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
table.getActiveTimeline().createNewInstant(
@@ -106,4 +125,22 @@ public class SavepointActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
throw new HoodieSavepointException("Failed to savepoint " + instantTime,
e);
}
}
+
+ /**
+ * Whether to use batch lookup for listing the latest base files in metadata
table.
+ * <p>
+ * Note that metadata table has to be enabled, and the storage type of the
file system view
+ * cannot be EMBEDDED_KV_STORE or SPILLABLE_DISK (these two types are not
integrated with
+ * metadata table, see HUDI-5612).
+ *
+ * @param config Write configs.
+ * @return {@code true} if using batch lookup; {@code false} otherwise.
+ */
+ private boolean shouldUseBatchLookup(HoodieWriteConfig config) {
+ FileSystemViewStorageType storageType =
+ config.getClientSpecifiedViewStorageConfig().getStorageType();
+ return config.getMetadataConfig().enabled()
+ && !FileSystemViewStorageType.EMBEDDED_KV_STORE.equals(storageType)
+ && !FileSystemViewStorageType.SPILLABLE_DISK.equals(storageType);
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
new file mode 100644
index 00000000000..765c24d6a65
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSavepoint.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.view.FileSystemViewStorageType.EMBEDDED_KV_STORE;
+import static
org.apache.hudi.common.table.view.FileSystemViewStorageType.MEMORY;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for savepoint operation.
+ */
+public class TestSavepoint extends HoodieClientTestBase {
+
+ private static Stream<Arguments> testSavepointParams() {
+ return Arrays.stream(new Object[][] {
+ {true, MEMORY}, {true, EMBEDDED_KV_STORE},
+ {false, MEMORY}, {false, EMBEDDED_KV_STORE}
+ }).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testSavepointParams")
+ public void testSavepoint(boolean enableMetadataTable,
+ FileSystemViewStorageType storageType) throws
IOException {
+ HoodieWriteConfig cfg = getWriteConfig(enableMetadataTable, storageType);
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0x17AB);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+ client.savepoint("user", "hoodie-savepoint-unit-test");
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ HoodieTimeline savepointTimeline =
table.getActiveTimeline().getSavePointTimeline();
+ assertEquals(1, savepointTimeline.countInstants());
+
+ Map<String, HoodieSavepointPartitionMetadata>
savepointPartitionMetadataMap =
+ TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+
savepointTimeline.getInstantDetails(savepointTimeline.firstInstant().get()).get())
+ .getPartitionMetadata();
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline();
+ Map<String, List<HoodieWriteStat>> partitionToWriteStats =
HoodieCommitMetadata.fromBytes(
+
commitsTimeline.getInstantDetails(commitsTimeline.firstInstant().get()).get(),
+ HoodieCommitMetadata.class)
+ .getPartitionToWriteStats();
+
+ assertEquals(partitionToWriteStats.size(),
savepointPartitionMetadataMap.size());
+ for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
+ String partition = entry.getKey();
+ assertTrue(savepointPartitionMetadataMap.containsKey(partition));
+ assertEquals(
+ entry.getValue().stream().map(path ->
getFileNameFromPath(path.getPath()))
+ .sorted().collect(Collectors.toList()),
+ savepointPartitionMetadataMap.get(partition).getSavepointDataFile()
+ .stream().sorted().collect(Collectors.toList())
+ );
+ }
+ }
+ }
+
+ private HoodieWriteConfig getWriteConfig(boolean enableMetadataTable,
+ FileSystemViewStorageType
storageType) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2)
+ .withDeleteParallelism(2)
+ .withWriteStatusClass(MetadataMergeWriteStatus.class)
+ .withConsistencyGuardConfig(
+
ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 *
1024).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024)
+ .orcMaxFileSize(1024 * 1024).build())
+ .forTable(RAW_TRIPS_TEST_NAME)
+ .withEmbeddedTimelineServerEnabled(true)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
+ .withRemoteServerPort(timelineServicePort)
+ .withStorageType(storageType)
+ .build())
+ .withMetadataConfig(
+
HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+ .build();
+ }
+
+ private String getFileNameFromPath(String path) {
+ String[] parts = path.split("/");
+ return parts[parts.length - 1];
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 94e15206c39..2b4172781c6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -21,8 +21,8 @@ package org.apache.hudi.testutils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index ca847bedaf0..8ea017977e9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus;
@@ -52,6 +53,7 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -60,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
@@ -122,7 +125,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
/**
* Refresh commits timeline.
- *
+ *
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -164,13 +167,13 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
* Build FileGroups from passed in file-status.
*/
protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses,
HoodieTimeline timeline,
- boolean addPendingCompactionFileSlice) {
+ boolean
addPendingCompactionFileSlice) {
return buildFileGroups(convertFileStatusesToBaseFiles(statuses),
convertFileStatusesToLogFiles(statuses), timeline,
addPendingCompactionFileSlice);
}
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile>
baseFileStream,
- Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean
addPendingCompactionFileSlice) {
+ Stream<HoodieLogFile>
logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
baseFileStream.collect(Collectors.groupingBy(baseFile -> {
String partitionPathStr = getPartitionPathFor(baseFile);
@@ -227,7 +230,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
// get replace instant mapping for each partition, fileId
return
replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry
-> entry.getValue().stream().map(e ->
- new AbstractMap.SimpleEntry<>(new
HoodieFileGroupId(entry.getKey(), e), instant)));
+ new AbstractMap.SimpleEntry<>(new
HoodieFileGroupId(entry.getKey(), e), instant)));
} catch (HoodieIOException ex) {
if (ex.getIOException() instanceof FileNotFoundException) {
@@ -289,6 +292,114 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+ try {
+ List<String> formattedPartitionList = getAllPartitionPaths().stream()
+ .map(this::formatPartitionKey).collect(Collectors.toList());
+ ensurePartitionsLoadedCorrectly(formattedPartitionList);
+ return formattedPartitionList;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get all partition paths", e);
+ }
+ }
+
+ /**
+ * Allows lazily loading the partitions if needed.
+ *
+ * @param partitionList list of partitions to be loaded if not present.
+ */
+ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+ Set<String> partitionSet = new HashSet<>();
+ synchronized (addedPartitions) {
+ partitionList.forEach(partition -> {
+ if (!addedPartitions.containsKey(partition) &&
!isPartitionAvailableInStore(partition)) {
+ partitionSet.add(partition);
+ }
+ });
+
+ if (!partitionSet.isEmpty()) {
+ long beginTs = System.currentTimeMillis();
+ // Not loaded yet
+ try {
+ LOG.info("Building file system view for partitions " + partitionSet);
+
+ // Pairs of relative partition path and absolute partition path
+ List<Pair<String, Path>> absolutePartitionPathList =
partitionSet.stream()
+ .map(partition -> Pair.of(
+ partition,
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+ .collect(Collectors.toList());
+ long beginLsTs = System.currentTimeMillis();
+ Map<Pair<String, Path>, FileStatus[]> statusesMap =
+ listPartitions(absolutePartitionPathList);
+ long endLsTs = System.currentTimeMillis();
+ LOG.debug("Time taken to list partitions " + partitionSet + " =" +
(endLsTs - beginLsTs));
+ statusesMap.forEach((partitionPair, statuses) -> {
+ String relativePartitionStr = partitionPair.getLeft();
+ List<HoodieFileGroup> groups = addFilesToView(statuses);
+ if (groups.isEmpty()) {
+ storePartitionView(relativePartitionStr, new ArrayList<>());
+ }
+ LOG.debug("#files found in partition (" + relativePartitionStr +
") =" + statuses.length);
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list base files in partitions
" + partitionSet, e);
+ }
+ long endTs = System.currentTimeMillis();
+ LOG.debug("Time to load partition " + partitionSet + " =" + (endTs -
beginTs));
+ }
+
+ partitionSet.forEach(partition ->
+ addedPartitions.computeIfAbsent(partition, partitionPathStr -> true)
+ );
+ }
+ }
+
+ /***
+ * @return A list of relative partition paths of all partitions.
+ * @throws IOException upon error.
+ */
+ protected List<String> getAllPartitionPaths() throws IOException {
+ throw new HoodieException("Getting all partition paths with file system
listing sequentially "
+ + "can be very slow. This should not be invoked.");
+ }
+
+ /**
+ * @param partitionPathList A list of pairs of the relative and absolute
paths of the partitions.
+ * @return all the files from the partitions.
+ * @throws IOException upon error.
+ */
+ protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
+ List<Pair<String, Path>> partitionPathList) throws IOException {
+ Map<Pair<String, Path>, FileStatus[]> fileStatusMap = new HashMap<>();
+
+ for (Pair<String, Path> partitionPair : partitionPathList) {
+ Path absolutePartitionPath = partitionPair.getRight();
+ try {
+ fileStatusMap.put(partitionPair,
metaClient.getFs().listStatus(absolutePartitionPath));
+ } catch (IOException e) {
+ // Create the path if it does not exist already
+ if (!metaClient.getFs().exists(absolutePartitionPath)) {
+ metaClient.getFs().mkdirs(absolutePartitionPath);
+ fileStatusMap.put(partitionPair, new FileStatus[0]);
+ } else {
+ // in case the partition path was created by another caller
+ fileStatusMap.put(partitionPair,
metaClient.getFs().listStatus(absolutePartitionPath));
+ }
+ }
+ }
+
+ return fileStatusMap;
+ }
+
/**
* Allows lazily loading the partitions if needed.
*
@@ -537,19 +648,38 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchAllStoredFileGroups(partitionPath)
- .filter(fileGroup ->
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
- .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
- .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
- ))
- .filter(df -> !isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst()))
- .filter(Option::isPresent).map(Option::get)
- .map(df -> addBootstrapBaseFileIfPresent(new
HoodieFileGroupId(partitionPath, df.getFileId()), df));
+ return getLatestBaseFilesBeforeOrOnFromCache(partitionPath,
maxCommitTime);
} finally {
readLock.unlock();
}
}
+ @Override
+ public final Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+ try {
+ readLock.lock();
+
+ List<String> formattedPartitionList =
ensureAllPartitionsLoadedCorrectly();
+ return formattedPartitionList.stream().collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath ->
getLatestBaseFilesBeforeOrOnFromCache(partitionPath, maxCommitTime)
+ ));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOnFromCache(String
partitionPath, String maxCommitTime) {
+ return fetchAllStoredFileGroups(partitionPath)
+ .filter(fileGroup ->
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
+ .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
+ .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
+ ))
+ .filter(df -> !isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst()))
+ .filter(Option::isPresent).map(Option::get)
+ .map(df -> addBootstrapBaseFileIfPresent(new
HoodieFileGroupId(partitionPath, df.getFileId()), df));
+ }
+
@Override
public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr,
String instantTime, String fileId) {
try {
@@ -560,8 +690,8 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return Option.empty();
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup ->
fileGroup.getAllBaseFiles()
- .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.EQUALS,
- instantTime)).filter(df ->
!isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
+ .filter(baseFile ->
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(),
HoodieTimeline.EQUALS,
+ instantTime)).filter(df ->
!isBaseFileDueToPendingCompaction(df) &&
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new
HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
@@ -596,8 +726,8 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return fetchAllStoredFileGroups()
.filter(fileGroup ->
!isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(),
Option.fromJavaOptional(
- fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
- && !isBaseFileDueToPendingCompaction(baseFile) &&
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p ->
p.getValue().isPresent())
+ fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
+ && !isBaseFileDueToPendingCompaction(baseFile) &&
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p ->
p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(),
p.getValue().get()));
} finally {
readLock.unlock();
@@ -627,9 +757,9 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
- .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
- .flatMap(slice ->
this.filterBaseFileAfterPendingCompaction(slice, true))
- .map(this::addBootstrapBaseFileIfPresent);
+ .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
+ .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice,
true))
+ .map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
@@ -684,26 +814,26 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
@Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionStr, String maxCommitTime,
- boolean includeFileSlicesInPendingCompaction) {
+ boolean
includeFileSlicesInPendingCompaction) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Stream<Stream<FileSlice>> allFileSliceStream =
fetchAllStoredFileGroups(partitionPath)
- .filter(slice ->
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
- .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
+ .filter(slice ->
!isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
+ .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice
-> this.filterBaseFileAfterPendingCompaction(slice, false)))
- .map(sliceStream ->
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
- .map(this::addBootstrapBaseFileIfPresent);
+ .map(sliceStream ->
Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
+ .map(this::addBootstrapBaseFileIfPresent);
} else {
return allFileSliceStream
- .map(sliceStream ->
- Option.fromJavaOptional(sliceStream
- .filter(slice ->
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
- .filter(slice -> !slice.isEmpty())
- .findFirst()))
-
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+ .map(sliceStream ->
+ Option.fromJavaOptional(sliceStream
+ .filter(slice ->
!isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
+ .filter(slice -> !slice.isEmpty())
+ .findFirst()))
+
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
readLock.unlock();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 62edc4daa33..44332117367 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
/**
@@ -145,6 +146,12 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
secondaryView::getLatestBaseFilesBeforeOrOn);
}
+ @Override
+ public Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+ return execute(maxCommitTime,
preferredView::getAllLatestBaseFilesBeforeOrOn,
+ secondaryView::getAllLatestBaseFilesBeforeOrOn);
+ }
+
@Override
public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String
fileId) {
return execute(partitionPath, fileId, preferredView::getLatestBaseFile,
secondaryView::getLatestBaseFile);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 5ada3dc45b5..bffd6353aa7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -54,6 +54,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -88,6 +89,8 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
+ public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
+ String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
@@ -155,17 +158,17 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
this.serverPort = viewConf.getRemoteViewServerPort();
this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
- retryHelper = new RetryHelper(
- viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
- viewConf.getRemoteTimelineClientMaxRetryNumbers(),
- viewConf.getRemoteTimelineInitialRetryIntervalMs(),
- viewConf.getRemoteTimelineClientRetryExceptions(),
- "Sending request");
+ retryHelper = new RetryHelper(
+ viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+ viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+ viewConf.getRemoteTimelineClientRetryExceptions(),
+ "Sending request");
}
}
private <T> T executeRequest(String requestPath, Map<String, String>
queryParameters, TypeReference reference,
- RequestMethod method) throws IOException {
+ RequestMethod method) throws IOException {
ValidationUtils.checkArgument(!closed, "View already closed");
URIBuilder builder =
@@ -252,13 +255,36 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
return getLatestBaseFilesFromParams(paramsMap,
LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
}
+ @Override
+ public Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(BASEPATH_PARAM, basePath);
+ paramsMap.put(MAX_INSTANT_PARAM, maxCommitTime);
+
+ try {
+ Map<String, List<BaseFileDTO>> dataFileMap = executeRequest(
+ ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
+ paramsMap,
+ new TypeReference<Map<String, List<BaseFileDTO>>>() {
+ },
+ RequestMethod.GET);
+ return dataFileMap.entrySet().stream().collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().stream().map(BaseFileDTO::toHoodieBaseFile)));
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String
instantTime, String fileId) {
Map<String, String> paramsMap =
getParamsWithAdditionalParams(partitionPath,
new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime,
fileId});
try {
List<BaseFileDTO> dataFiles =
executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap,
- new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+ new TypeReference<List<BaseFileDTO>>() {
+ }, RequestMethod.GET);
return
Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
} catch (IOException e) {
throw new HoodieRemoteException(e);
@@ -505,7 +531,8 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
try {
List<BaseFileDTO> dataFiles =
executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap,
- new TypeReference<List<BaseFileDTO>>() {}, RequestMethod.GET);
+ new TypeReference<List<BaseFileDTO>>() {
+ }, RequestMethod.GET);
return
Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst());
} catch (IOException e) {
throw new HoodieRemoteException(e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 18c9a9af998..97b972a75d1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
/**
@@ -64,6 +65,16 @@ public interface TableFileSystemView {
*/
Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitionPath,
String maxCommitTime);
+ /**
+ * Streams the latest version base files in all partitions with
precondition that
+ * commitTime(file) before maxCommitTime.
+ *
+ * @param maxCommitTime The max commit time to consider.
+ * @return A {@link Map} of partition path to the latest version base
files before or on the
+ * commit time
+ */
+ Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String
maxCommitTime);
+
/**
* Stream all the latest data files pass.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index ab5b5f6b4db..76b341609de 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -24,12 +24,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* {@code HoodieTableFileSystemView} implementation that retrieved partition
listings from the Metadata Table.
@@ -65,6 +70,26 @@ public class HoodieMetadataFileSystemView extends
HoodieTableFileSystemView {
return tableMetadata.getAllFilesInPartition(partitionPath);
}
+ @Override
+ protected List<String> getAllPartitionPaths() throws IOException {
+ return tableMetadata.getAllPartitionPaths();
+ }
+
+ @Override
+ protected Map<Pair<String, Path>, FileStatus[]>
listPartitions(List<Pair<String, Path>> partitionPathList) throws IOException {
+ Map<String, Pair<String, Path>> absoluteToPairMap =
partitionPathList.stream()
+ .collect(Collectors.toMap(
+ pair -> pair.getRight().toString(),
+ Function.identity()
+ ));
+ return tableMetadata.getAllFilesInPartitions(
+ partitionPathList.stream().map(pair ->
pair.getRight().toString()).collect(Collectors.toList()))
+ .entrySet().stream().collect(Collectors.toMap(
+ entry -> absoluteToPairMap.get(entry.getKey()),
+ Map.Entry::getValue
+ ));
+ }
+
@Override
public void reset() {
super.reset();
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 4c999b9ad22..7ad175c692a 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -56,6 +56,7 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -276,6 +277,14 @@ public class RequestHandler {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1);
+ Map<String, List<BaseFileDTO>> dtos =
dataFileHandler.getAllLatestDataFilesBeforeOrOn(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
index 5033a86f8f2..a34b49843fa 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -61,6 +62,16 @@ public class BaseFileHandler extends Handler {
.map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList());
}
+ public Map<String, List<BaseFileDTO>> getAllLatestDataFilesBeforeOrOn(String
basePath, String maxInstantTime) {
+ return viewManager.getFileSystemView(basePath)
+ .getAllLatestBaseFilesBeforeOrOn(maxInstantTime)
+ .entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().map(BaseFileDTO::fromHoodieBaseFile).collect(Collectors.toList())
+ ));
+ }
+
public List<BaseFileDTO> getLatestDataFileOn(String basePath, String
partitionPath, String instantTime,
String fileId) {
List<BaseFileDTO> result = new ArrayList<>();