Sagar Sumit created HUDI-7420:
---------------------------------

             Summary: Parallelize the process of constructing 
`logFilesMarkerPath` in CommitMetadatautils#reconcileMetadataForMissingFiles
                 Key: HUDI-7420
                 URL: https://issues.apache.org/jira/browse/HUDI-7420
             Project: Apache Hudi
          Issue Type: Task
            Reporter: Sagar Sumit
             Fix For: 0.15.0, 1.0.0


This is related to HUDI-1517.
Current logic is:
{code:java}
Set<String> logFilesMarkerPath = new HashSet<>();
allLogFilesMarkerPath.stream().filter(logFilePath -> 
!logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);

// remove valid log files
// TODO: refactor based on HoodieData
for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
commitMetadata.getPartitionToWriteStats().entrySet()) {
  for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) {
    logFilesMarkerPath.remove(hoodieWriteStat.getPath());
  }
} {code}
The for loop can be achieved via context.parallelize as below, but need to 
check for thread-safety.
{code:java}
Set<String> logFilesMarkerPath = new HashSet<>();
allLogFilesMarkerPath.stream().filter(logFilePath -> 
!logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);

// Convert the partition and write stats to a list of log file paths to be 
removed
List<String> validLogFilePaths = context.parallelize(new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().entrySet()))
    .flatMapToPair((SerializablePairFunction<Map.Entry<String, 
List<HoodieWriteStat>>, String, Void>) entry -> {
        List<Pair<String, Void>> pathsToRemove = new ArrayList<>();
        entry.getValue().forEach(hoodieWriteStat -> 
pathsToRemove.add(Pair.of(hoodieWriteStat.getPath(), null)));
        return pathsToRemove.iterator();
    })
    .map(t -> t.getLeft())
    .collect();

// Remove the valid log file paths from logFilesMarkerPath in a parallel manner
// Depending on the specifics of your environment and HoodieEngineContext, this 
might need to be adapted.
// For a straightforward approach without parallelization of the remove 
operation:
validLogFilePaths.forEach(logFilesMarkerPath::remove); {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to