[
https://issues.apache.org/jira/browse/HUDI-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinoth Chandar updated HUDI-7420:
---------------------------------
Sprint: Sprint 2024-03-25, Sprint 2023-04-26 (was: Sprint 2024-03-25)
> Parallelize the process of constructing `logFilesMarkerPath` in
> CommitMetadatautils#reconcileMetadataForMissingFiles
> --------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-7420
> URL: https://issues.apache.org/jira/browse/HUDI-7420
> Project: Apache Hudi
> Issue Type: Task
> Reporter: Sagar Sumit
> Assignee: Sagar Sumit
> Priority: Major
> Fix For: 0.15.0, 1.0.0
>
>
> This is related to HUDI-1517.
> Current logic is:
> {code:java}
> Set<String> logFilesMarkerPath = new HashSet<>();
> allLogFilesMarkerPath.stream().filter(logFilePath ->
> !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
> // remove valid log files
> // TODO: refactor based on HoodieData
> for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
> commitMetadata.getPartitionToWriteStats().entrySet()) {
> for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) {
> logFilesMarkerPath.remove(hoodieWriteStat.getPath());
> }
> } {code}
> The for loop can be achieved via context.parallelize as below, but need to
> check for thread-safety.
> {code:java}
> Set<String> logFilesMarkerPath = new HashSet<>();
> allLogFilesMarkerPath.stream().filter(logFilePath ->
> !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
> // Convert the partition and write stats to a list of log file paths to be
> removed
> List<String> validLogFilePaths = context.parallelize(new
> ArrayList<>(commitMetadata.getPartitionToWriteStats().entrySet()))
> .flatMapToPair((SerializablePairFunction<Map.Entry<String,
> List<HoodieWriteStat>>, String, Void>) entry -> {
> List<Pair<String, Void>> pathsToRemove = new ArrayList<>();
> entry.getValue().forEach(hoodieWriteStat ->
> pathsToRemove.add(Pair.of(hoodieWriteStat.getPath(), null)));
> return pathsToRemove.iterator();
> })
> .map(t -> t.getLeft())
> .collect();
> // Remove the valid log file paths from logFilesMarkerPath in a parallel
> manner
> // Depending on the specifics of your environment and HoodieEngineContext,
> this might need to be adapted.
> // For a straightforward approach without parallelization of the remove
> operation:
> validLogFilePaths.forEach(logFilesMarkerPath::remove); {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)