nsivabalan commented on code in PR #7690:
URL: https://github.com/apache/hudi/pull/7690#discussion_r1084652972
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java:
##########
@@ -84,15 +84,32 @@ public HoodieSavepointMetadata execute() {
"Could not savepoint commit " + instantTime + " as this is beyond
the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest
files for savepoint " + instantTime + " " + table.getConfig().getTableName());
- List<String> partitions = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), table.getMetaClient().getBasePath());
- Map<String, List<String>> latestFilesMap = context.mapToPair(partitions,
partitionPath -> {
- // Scan all partitions files with this commit time
- LOG.info("Collecting latest files in partition path " + partitionPath);
- TableFileSystemView.BaseFileOnlyView view =
table.getBaseFileOnlyView();
- List<String> latestFiles =
view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new ImmutablePair<>(partitionPath, latestFiles);
- }, null);
+ TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+ Map<String, List<String>> latestFilesMap;
+ // NOTE: for performance, we have to use different logic here for
listing the latest files
+ // before or on the given instant:
+ // (1) using metadata-table-based file listing: instead of parallelizing
the partition
+ // listing which incurs unnecessary metadata table reads, we directly
read the metadata
+ // table once in a batch manner through the timeline server;
+ // (2) using direct file system listing: we parallelize the partition
listing so that
+ // each partition can be listed on the file system concurrently through
Spark.
+ if (config.getMetadataConfig().enabled()) {
Review Comment:
likely there are other code paths in hudi where we can employ similar
optimizations. can we file a follow up ticket to triage them and tackle
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java:
##########
@@ -157,8 +156,8 @@ public HoodieWriteConfig.Builder getConfigBuilder(String
schemaStr, IndexType in
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
- .withRemoteServerPort(timelineServicePort)
-
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ .withRemoteServerPort(timelineServicePort).build());
+ //.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
Review Comment:
why commented out?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -81,6 +86,8 @@ public abstract class AbstractTableFileSystemView implements
SyncableFileSystemV
private static final Logger LOG =
LogManager.getLogger(AbstractTableFileSystemView.class);
protected HoodieTableMetaClient metaClient;
+ // TODO: to pass in the actual config
+ protected boolean assumeDatePartitioning = false;
Review Comment:
tracking ticket please.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -550,6 +674,28 @@ public final Stream<HoodieBaseFile>
getLatestBaseFilesBeforeOrOn(String partitio
}
}
+ @Override
Review Comment:
should we override this from within MetadataFileSystemView and throw from
within AbstractTableFileSystemView ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
Review Comment:
left a msg below. we should keep this within MetadataFSV. should not let FS
based listing call in these by mistake.
##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -319,6 +320,68 @@ public static List<String>
getAllPartitionPaths(HoodieEngineContext engineContex
}
}
+ /**
+ * Gets all partition paths in a table, based on a relative partition path
prefix.
+ *
+ * @param engineContext {@link HoodieEngineContext} instance for file
listing.
+ * @param hadoopConf Hadoop configuration.
+ * @param basePath Base path of the table.
+ * @param prefix Relative partition path prefix for listing.
+ * @param parallelism Listing parallelism to use if the engine context
supports it.
+ * @return A list of all partition paths with the prefix.
+ * @throws IOException upon error.
+ */
+ public static List<String> getPartitionPathsWithPrefix(HoodieEngineContext
engineContext,
Review Comment:
we have FSUtils.getAllPartitionPaths(engineContext, basePath, false,
cfg.assumeDatePartitioning) already. Don't think calling prefix based api is
the right thing to do here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+ try {
+ List<String> formattedPartitionList = getAllPartitionPaths().stream()
+ .map(this::formatPartitionKey).collect(Collectors.toList());
+ ensurePartitionsLoadedCorrectly(formattedPartitionList);
+ return formattedPartitionList;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get all partition paths", e);
+ }
+ }
+
+ /**
+ * Allows lazily loading the partitions if needed.
+ *
+ * @param partitionList list of partitions to be loaded if not present.
+ */
+ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+ Set<String> partitionSet = new HashSet<>();
+ partitionList.forEach(partition ->
+ addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
+ if (!isPartitionAvailableInStore(partitionPathStr)) {
+ partitionSet.add(partitionPathStr);
+ }
+ return true;
+ })
+ );
+
+ if (!partitionSet.isEmpty()) {
+ long beginTs = System.currentTimeMillis();
+ // Not loaded yet
+ try {
+ LOG.info("Building file system view for partitions " + partitionSet);
+
+ // Pairs of relative partition path and absolute partition path
+ List<Pair<String, Path>> absolutePartitionPathList =
partitionSet.stream()
+ .map(partition -> Pair.of(
+ partition,
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+ .collect(Collectors.toList());
+ long beginLsTs = System.currentTimeMillis();
+ Map<Pair<String, Path>, FileStatus[]> statusesMap =
+ listPartitions(absolutePartitionPathList);
+ long endLsTs = System.currentTimeMillis();
+ LOG.debug("Time taken to list partitions " + partitionSet + " =" +
(endLsTs - beginLsTs));
+ statusesMap.forEach((partitionPair, statuses) -> {
+ String relativePartitionStr = partitionPair.getLeft();
+ List<HoodieFileGroup> groups = addFilesToView(statuses);
+ if (groups.isEmpty()) {
+ storePartitionView(relativePartitionStr, new ArrayList<>());
+ }
+ LOG.debug("#files found in partition (" + relativePartitionStr + ")
=" + statuses.length);
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list base files in partitions "
+ partitionSet, e);
+ }
+ long endTs = System.currentTimeMillis();
+ LOG.debug("Time to load partition " + partitionSet + " =" + (endTs -
beginTs));
+ }
+ }
+
+ /***
+ * @return A list of relative partition paths of all partitions.
+ * @throws IOException upon error.
+ */
+ protected List<String> getAllPartitionPaths() throws IOException {
Review Comment:
let's not support these apis for FS based listing.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -550,6 +674,28 @@ public final Stream<HoodieBaseFile>
getLatestBaseFilesBeforeOrOn(String partitio
}
}
+ @Override
+ public final Map<String, Stream<HoodieBaseFile>>
getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
+ try {
+ readLock.lock();
+
+ List<String> formattedPartitionList =
ensureAllPartitionsLoadedCorrectly();
+ return formattedPartitionList.stream().collect(Collectors.toMap(
+ Function.identity(),
+ partitionPath -> fetchAllStoredFileGroups(partitionPath)
+ .filter(fileGroup ->
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
Review Comment:
is it possible to reuse `getLatestBaseFilesBeforeOrOn(String partitionStr,
String maxCommitTime)` to avoid code duplication. just incase we make some bug
fix in the other method, might forget to fix here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -596,8 +742,8 @@ public final Stream<HoodieBaseFile>
getLatestBaseFilesInRange(List<String> commi
return fetchAllStoredFileGroups()
.filter(fileGroup ->
!isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(),
Option.fromJavaOptional(
- fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
- && !isBaseFileDueToPendingCompaction(baseFile) &&
!isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p ->
p.getValue().isPresent())
+ fileGroup.getAllBaseFiles().filter(baseFile ->
commitsToReturn.contains(baseFile.getCommitTime())
Review Comment:
can you please revert unintentional changes. will be easy to review
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+ try {
+ List<String> formattedPartitionList = getAllPartitionPaths().stream()
+ .map(this::formatPartitionKey).collect(Collectors.toList());
+ ensurePartitionsLoadedCorrectly(formattedPartitionList);
+ return formattedPartitionList;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get all partition paths", e);
+ }
+ }
+
+ /**
+ * Allows lazily loading the partitions if needed.
+ *
+ * @param partitionList list of partitions to be loaded if not present.
+ */
+ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+ Set<String> partitionSet = new HashSet<>();
+ partitionList.forEach(partition ->
+ addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
Review Comment:
we should add entries to addedPartitions only when the file groups
pertaining to a particular partition is already loaded. there could be
concurrent access to this map (addedPartitions). But in this impl. I see we
first add them to the map add later listing files and populate the file groups.
We might need to fix that.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -289,6 +296,123 @@ private void clear() {
*/
protected abstract void resetViewState();
+ /**
+ * Batch loading all the partitions if needed.
+ *
+ * @return A list of relative partition paths of all partitions.
+ */
+ private List<String> ensureAllPartitionsLoadedCorrectly() {
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+ try {
+ List<String> formattedPartitionList = getAllPartitionPaths().stream()
+ .map(this::formatPartitionKey).collect(Collectors.toList());
+ ensurePartitionsLoadedCorrectly(formattedPartitionList);
+ return formattedPartitionList;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get all partition paths", e);
+ }
+ }
+
+ /**
+ * Allows lazily loading the partitions if needed.
+ *
+ * @param partitionList list of partitions to be loaded if not present.
+ */
+ private void ensurePartitionsLoadedCorrectly(List<String> partitionList) {
+
+ ValidationUtils.checkArgument(!isClosed(), "View is already closed");
+
+ Set<String> partitionSet = new HashSet<>();
+ partitionList.forEach(partition ->
+ addedPartitions.computeIfAbsent(partition, partitionPathStr -> {
+ if (!isPartitionAvailableInStore(partitionPathStr)) {
+ partitionSet.add(partitionPathStr);
+ }
+ return true;
+ })
+ );
+
+ if (!partitionSet.isEmpty()) {
+ long beginTs = System.currentTimeMillis();
+ // Not loaded yet
+ try {
+ LOG.info("Building file system view for partitions " + partitionSet);
+
+ // Pairs of relative partition path and absolute partition path
+ List<Pair<String, Path>> absolutePartitionPathList =
partitionSet.stream()
+ .map(partition -> Pair.of(
+ partition,
FSUtils.getPartitionPath(metaClient.getBasePathV2(), partition)))
+ .collect(Collectors.toList());
+ long beginLsTs = System.currentTimeMillis();
+ Map<Pair<String, Path>, FileStatus[]> statusesMap =
+ listPartitions(absolutePartitionPathList);
+ long endLsTs = System.currentTimeMillis();
+ LOG.debug("Time taken to list partitions " + partitionSet + " =" +
(endLsTs - beginLsTs));
+ statusesMap.forEach((partitionPair, statuses) -> {
+ String relativePartitionStr = partitionPair.getLeft();
+ List<HoodieFileGroup> groups = addFilesToView(statuses);
+ if (groups.isEmpty()) {
+ storePartitionView(relativePartitionStr, new ArrayList<>());
+ }
+ LOG.debug("#files found in partition (" + relativePartitionStr + ")
=" + statuses.length);
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list base files in partitions "
+ partitionSet, e);
+ }
+ long endTs = System.currentTimeMillis();
+ LOG.debug("Time to load partition " + partitionSet + " =" + (endTs -
beginTs));
+ }
+ }
+
+ /***
+ * @return A list of relative partition paths of all partitions.
+ * @throws IOException upon error.
+ */
+ protected List<String> getAllPartitionPaths() throws IOException {
+ // TODO: integrate the direct FS listing with the actual engine context
+ LOG.warn("Getting all partition paths with file system listing
sequentially can be very slow. "
+ + "This should not be invoked.");
+ String bathPath = metaClient.getBasePathV2().toString();
+ FileSystem fs = new
Path(bathPath).getFileSystem(metaClient.getHadoopConf());
+ if (assumeDatePartitioning) {
+ return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, bathPath);
+ }
+ return FSUtils.getPartitionPathsWithPrefix(
Review Comment:
calling prefix based apis to fetch all partition paths does not look right.
lets see how we can fix this properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]