satishkotha commented on issue #4109:
URL: https://github.com/apache/hudi/issues/4109#issuecomment-978407363


   CocurrentModificationException seems to be coming from here 
   
https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java#L83
   
   We need to redo this logic to avoid newFilesWrittenForPartition.remove(...). 
 
   
   Simple option to try out: 
   Replace line 72 with
   `
    Map<String, HoodieBaseFile> newFilesWrittenForPartition  = new 
ConcurrentHashMap(filesWritten.stream()
           .filter(file -> partitionStr.equals(file.getPartitionPath()))
           .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> 
               new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), 
writeStat.getPath()).toString()))))
   `
   
   Probably better option is to group based on fileId i.e., replace line 78 -88 
with:
   This needs some more testing. I can send PR next week.
   `
   Map<String, HoodieBaseFile> baseFilesForCommittedFileIds = committedBaseFiles
           // Remove files replaced by current inflight commit
           .filter(baseFile -> 
!replacedFileIdsForPartition.contains(baseFile.getFileId()))
           collect(Collectors.toMap(HoodieBaseFile::getFileId, baseFile -> 
baseFile))
   
   baseFilesForCommittedFileIds.putAll(newFilesWrittenForPartition)
   return baseFilesForCommittedFileIds.values().stream();
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to