yihua commented on code in PR #7690:
URL: https://github.com/apache/hudi/pull/7690#discussion_r1086092057
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java:
##########
@@ -84,15 +84,32 @@ public HoodieSavepointMetadata execute() {
"Could not savepoint commit " + instantTime + " as this is beyond
the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
- List<String> partitions = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), table.getMetaClient().getBasePath());
- Map<String, List<String>> latestFilesMap = context.mapToPair(partitions,
partitionPath -> {
- // Scan all partitions files with this commit time
- LOG.info("Collecting latest files in partition path " + partitionPath);
- TableFileSystemView.BaseFileOnlyView view =
table.getBaseFileOnlyView();
- List<String> latestFiles =
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new ImmutablePair<>(partitionPath, latestFiles);
- }, null);
+ TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+ Map<String, List<String>> latestFilesMap;
+ // NOTE: for performance, we have to use different logic here for
listing the latest files
+ // before or on the given instant:
+ // (1) using metadata-table-based file listing: instead of parallelizing
the partition
+ // listing which incurs unnecessary metadata table reads, we directly
read the metadata
+ // table once in a batch manner through the timeline server;
+ // (2) using direct file system listing: we parallelize the partition
listing so that
+ // each partition can be listed on the file system concurrently through
Spark.
+ if (config.getMetadataConfig().enabled()) {
Review Comment:
This ticket tracks the follow-up:
[HUDI-5611](https://issues.apache.org/jira/browse/HUDI-5611).
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -157,8 +156,8 @@ public HoodieWriteConfig.Builder getConfigBuilder(String
schemaStr, IndexType in
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
- .withRemoteServerPort(timelineServicePort)
-
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ .withRemoteServerPort(timelineServicePort).build());
+ //.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
Review Comment:
This is for testing the default storage type temporarily only. Will revert
this.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -157,8 +156,8 @@ public HoodieWriteConfig.Builder getConfigBuilder(String
schemaStr, IndexType in
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
- .withRemoteServerPort(timelineServicePort)
-
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ .withRemoteServerPort(timelineServicePort).build());
+ //.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
Review Comment:
Through this testing, I discovered that `SpillableMapBasedFileSystemView`
and `RocksDbBasedFileSystemView` are not integrated with metadata-table-based
file listing. Filed a ticket: HUDI-5612.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -596,8 +742,8 @@ public final Stream<HoodieBaseFile>
getLatestBaseFilesInRange(List<String> commi
return fetchAllStoredFileGroups()
.filter(fileGroup ->
!isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(),
Option.fromJavaOptional(
- fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
- && !isBaseFileDueToPendingCompaction(baseFile) &&
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p ->
p.getValue().isPresent())
+ fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
Review Comment:
This is automatically changed due to the IDE formatter.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -550,6 +674,28 @@ public final Stream<HoodieBaseFile>
getLatestBaseFilesBeforeOrOn(String partitio
}
}
+ @Override
+ public final Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+ try {
+ readLock.lock();
+
+ List<String> formattedPartitionList =
ensureAllPartitionsLoadedCorrectly();
+ return formattedPartitionList.stream().collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath -> fetchAllStoredFileGroups(partitionPath)
+ .filter(fileGroup ->
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
Review Comment:
We can extract the core logic of filtering file groups. We cannot directly
reuse `getLatestBaseFilesBeforeOrOn()` because it triggers metadata table
lookup per partition, which this PR is trying to fix.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+ try {
+ List<String> formattedPartitionList = getAllPartitionPaths().stream()
+ .map(this::formatPartitionKey).collect(Collectors.toList());
+ ensurePartitionsLoadedCorrectly(formattedPartitionList);
+ return formattedPartitionList;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get all partition paths", e);
+ }
+ }
+
+ /**
+ * Allows lazily loading the partitions if needed.
+ *
+ * @param partitionList list of partitions to be loaded if not present.
+ */
+ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+ Set<String> partitionSet = new HashSet<>();
+ partitionList.forEach(partition ->
+ addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
+ if (!isPartitionAvailableInStore(partitionPathStr)) {
+ partitionSet.add(partitionPathStr);
+ }
+ return true;
+ })
+ );
+
+ if (!partitionSet.isEmpty()) {
+ long beginTs = System.currentTimeMillis();
+ // Not loaded yet
+ try {
+ LOG.info("Building file system view for partitions " + partitionSet);
+
+ // Pairs of relative partition path and absolute partition path
+ List<Pair<String, Path>> absolutePartitionPathList =
partitionSet.stream()
+ .map(partition -> Pair.of(
+ partition,
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+ .collect(Collectors.toList());
+ long beginLsTs = System.currentTimeMillis();
+ Map<Pair<String, Path>, FileStatus[]> statusesMap =
+ listPartitions(absolutePartitionPathList);
+ long endLsTs = System.currentTimeMillis();
+ LOG.debug("Time taken to list partitions " + partitionSet + " =" +
(endLsTs - beginLsTs));
+ statusesMap.forEach((partitionPair, statuses) -> {
+ String relativePartitionStr = partitionPair.getLeft();
+ List<HoodieFileGroup> groups = addFilesToView(statuses);
+ if (groups.isEmpty()) {
+ storePartitionView(relativePartitionStr, new ArrayList<>());
+ }
+ LOG.debug("#files found in partition (" + relativePartitionStr + ")
=" + statuses.length);
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list base files in partitions "
+ partitionSet, e);
+ }
+ long endTs = System.currentTimeMillis();
+ LOG.debug("Time to load partition " + partitionSet + " =" + (endTs -
beginTs));
+ }
+ }
+
+ /***
+ * @return A list of relative partition paths of all partitions.
+ * @throws IOException upon error.
+ */
+ protected List<String> getAllPartitionPaths() throws IOException {
+ // TODO: integrate the direct FS listing with the actual engine context
+ LOG.warn("Getting all partition paths with file system listing
sequentially can be very slow. "
+ + "This should not be invoked.");
+ String bathPath = metaClient.getBasePathV2().toString();
+ FileSystem fs = new
Path(bathPath).getFileSystem(metaClient.getHadoopConf());
+ if (assumeDatePartitioning) {
+ return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, bathPath);
+ }
+ return FSUtils.getPartitionPathsWithPrefix(
Review Comment:
This is the same logic used by `FileSystemBackedTableMetadata`.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -319,6 +320,68 @@ public static List<String>
getAllPartitionPaths(HoodieEngineContext engineContex
}
}
+ /**
+ * Gets all partition paths in a table, based on a relative partition path
prefix.
+ *
+ * @param engineContext {@link HoodieEngineContext} instance for file
listing.
+ * @param hadoopConf Hadoop configuration.
+ * @param basePath Base path of the table.
+ * @param prefix Relative partition path prefix for listing.
+ * @param parallelism Listing parallelism to use if the engine context
supports it.
+ * @return A list of all partition paths with the prefix.
+ * @throws IOException upon error.
+ */
+ public static List<String> getPartitionPathsWithPrefix(HoodieEngineContext
engineContext,
Review Comment:
This method is extracted from `FileSystemBackedTableMetadata` for reuse.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]