Sagar Sumit created HUDI-7420:
---------------------------------
Summary: Parallelize the process of constructing
`logFilesMarkerPath` in CommitMetadatautils#reconcileMetadataForMissingFiles
Key: HUDI-7420
URL: https://issues.apache.org/jira/browse/HUDI-7420
Project: Apache Hudi
Issue Type: Task
Reporter: Sagar Sumit
Fix For: 0.15.0, 1.0.0
This is related to HUDI-1517.
Current logic is:
{code:java}
Set<String> logFilesMarkerPath = new HashSet<>();
allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
// remove valid log files
// TODO: refactor based on HoodieData
for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) {
logFilesMarkerPath.remove(hoodieWriteStat.getPath());
}
} {code}
The for loop can be achieved via context.parallelize as below, but need to
check for thread-safety.
{code:java}
Set<String> logFilesMarkerPath = new HashSet<>();
allLogFilesMarkerPath.stream().filter(logFilePath ->
!logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
// Convert the partition and write stats to a list of log file paths to be
removed
List<String> validLogFilePaths = context.parallelize(new
ArrayList<>(commitMetadata.getPartitionToWriteStats().entrySet()))
.flatMapToPair((SerializablePairFunction<Map.Entry<String,
List<HoodieWriteStat>>, String, Void>) entry -> {
List<Pair<String, Void>> pathsToRemove = new ArrayList<>();
entry.getValue().forEach(hoodieWriteStat ->
pathsToRemove.add(Pair.of(hoodieWriteStat.getPath(), null)));
return pathsToRemove.iterator();
})
.map(t -> t.getLeft())
.collect();
// Remove the valid log file paths from logFilesMarkerPath in a parallel manner
// Depending on the specifics of your environment and HoodieEngineContext, this
might need to be adapted.
// For a straightforward approach without parallelization of the remove
operation:
validLogFilePaths.forEach(logFilesMarkerPath::remove); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)