[ 
https://issues.apache.org/jira/browse/HUDI-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aidendong updated HUDI-4308:
----------------------------
    Description: 
Question :

In HoodieTableQueryType.READ_OPTIMIZED read mode

The corresponding FileGroup has no readable base file when compaction working.

and if this compaction failed, The corresponding base file has been unreadable.

 

demo:

 
{code:java}
//demo
Dataset<Row> table = session.read()
    .format("org.apache.hudi")
    .option("hoodie.datasource.query.type", "read_optimized")
    .load("/tmp/hudi_test2/hudi_test_demo"); {code}
 

 
----
Reson: 

{color:#172b4d}The{color}{color:#0000ff} BaseHoodieTableFileIndex  
{color}{color:#172b4d}is base class when read table, and it load FlieSlice when 
{color}doRefresh().

 
{code:java}
if ((tableType.equals(HoodieTableType.MERGE_ON_READ) && 
queryType.equals(HoodieTableQueryType.SNAPSHOT))) {

  cachedAllInputFileSlices = partitionFiles.keySet().stream()
      .collect(Collectors.toMap(
          Function.identity(),
          partitionPath ->
              queryInstant.map(instant ->
                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, 
queryInstant.get())
                    .collect(Collectors.toList())
              )
              .orElse(Collections.emptyList())
          )
      );
} else {
  cachedAllInputFileSlices = partitionFiles.keySet().stream()
     .collect(Collectors.toMap(
         Function.identity(),
         partitionPath ->
             queryInstant.map(instant -> 
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
             )
               .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
               .collect(Collectors.toList())
         )
     );
} {code}
but {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_ {color}has a 
question:the valid  base file ,
it's maybe from the max version fileslice before compaction. but 
{color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_{color} cannot get.

 
{code:java}
@Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
partitionStr, String maxCommitTime,
    boolean includeFileSlicesInPendingCompaction) {
    ...
    Stream<FileSlice> fileSliceStream = 
fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
        .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), 
maxCommitTime));
    if (includeFileSlicesInPendingCompaction) {
      return 
fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
   ...
}

protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
    ...
    FileSlice transformed =
        new FileSlice(fileSlice.getPartitionPath(), 
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
    fileSlice.getLogFiles().forEach(transformed::addLogFile);
    ....
}{code}
this function will ignore base file when compaction start, and if compaction 
failed, The base file in this fileslice always noreadable before compaction 
recover. 

I think, we can read the fileslice with largest available version in this 
filegroup.

 

!image-2022-06-24-11-07-36-638.png!

spark use it in {color:#00875a}HoodieFileIndex.allFiles(). {color}it get all 
base file in {color:#00875a}cachedAllInputFileSlices {color}
{code:java}
//代码占位符

def allFiles: Seq[FileStatus] = {
  cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
    .filter(_.getBaseFile.isPresent)
    .map(_.getBaseFile.get().getFileStatus)
    .toSeq
}{code}
 

 

{color:#0000ff} {color}

  was:
In HoodieTableQueryType.READ_OPTIMIZED read mode

The corresponding FileGroup has no readable base file when compaction working.

and if this compaction failed, The corresponding base file has been unreadable.

 

 


> READ_OPTIMIZED read mode will temporary loss of data when compaction
> --------------------------------------------------------------------
>
>                 Key: HUDI-4308
>                 URL: https://issues.apache.org/jira/browse/HUDI-4308
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: aidendong
>            Assignee: aidendong
>            Priority: Major
>              Labels: ReadFile, pull-request-available
>         Attachments: image-2022-06-24-11-07-36-638.png
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Question :
> In HoodieTableQueryType.READ_OPTIMIZED read mode
> The corresponding FileGroup has no readable base file when compaction working.
> and if this compaction failed, The corresponding base file has been 
> unreadable.
>  
> demo:
>  
> {code:java}
> //demo
> Dataset<Row> table = session.read()
>     .format("org.apache.hudi")
>     .option("hoodie.datasource.query.type", "read_optimized")
>     .load("/tmp/hudi_test2/hudi_test_demo"); {code}
>  
>  
> ----
> Reson: 
> {color:#172b4d}The{color}{color:#0000ff} BaseHoodieTableFileIndex  
> {color}{color:#172b4d}is base class when read table, and it load FlieSlice 
> when {color}doRefresh().
>  
> {code:java}
> if ((tableType.equals(HoodieTableType.MERGE_ON_READ) && 
> queryType.equals(HoodieTableQueryType.SNAPSHOT))) {
>   cachedAllInputFileSlices = partitionFiles.keySet().stream()
>       .collect(Collectors.toMap(
>           Function.identity(),
>           partitionPath ->
>               queryInstant.map(instant ->
>                 
> fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, 
> queryInstant.get())
>                     .collect(Collectors.toList())
>               )
>               .orElse(Collections.emptyList())
>           )
>       );
> } else {
>   cachedAllInputFileSlices = partitionFiles.keySet().stream()
>      .collect(Collectors.toMap(
>          Function.identity(),
>          partitionPath ->
>              queryInstant.map(instant -> 
> fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, 
> true)
>              )
>                .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
>                .collect(Collectors.toList())
>          )
>      );
> } {code}
> but {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_ {color}has 
> a question:the valid  base file ,
> it's maybe from the max version fileslice before compaction. but 
> {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_{color} cannot 
> get.
>  
> {code:java}
> @Override
> public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String 
> partitionStr, String maxCommitTime,
>     boolean includeFileSlicesInPendingCompaction) {
>     ...
>     Stream<FileSlice> fileSliceStream = 
> fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
>         .filter(slice -> 
> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
>     if (includeFileSlicesInPendingCompaction) {
>       return 
> fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
>    ...
> }
> protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) 
> {
>     ...
>     FileSlice transformed =
>         new FileSlice(fileSlice.getPartitionPath(), 
> fileSlice.getBaseInstantTime(), fileSlice.getFileId());
>     fileSlice.getLogFiles().forEach(transformed::addLogFile);
>     ....
> }{code}
> this function will ignore base file when compaction start, and if compaction 
> failed, The base file in this fileslice always noreadable before compaction 
> recover. 
> I think, we can read the fileslice with largest available version in this 
> filegroup.
>  
> !image-2022-06-24-11-07-36-638.png!
> spark use it in {color:#00875a}HoodieFileIndex.allFiles(). {color}it get all 
> base file in {color:#00875a}cachedAllInputFileSlices {color}
> {code:java}
> //代码占位符
> def allFiles: Seq[FileStatus] = {
>   cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
>     .filter(_.getBaseFile.isPresent)
>     .map(_.getBaseFile.get().getFileStatus)
>     .toSeq
> }{code}
>  
>  
> {color:#0000ff} {color}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to