[
https://issues.apache.org/jira/browse/HUDI-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
aidendong updated HUDI-4308:
----------------------------
Description:
Question :
In HoodieTableQueryType.READ_OPTIMIZED read mode
The corresponding FileGroup has no readable base file when compaction working.
and if this compaction failed, The corresponding base file has been unreadable.
demo:
{code:java}
//demo
Dataset<Row> table = session.read()
.format("org.apache.hudi")
.option("hoodie.datasource.query.type", "read_optimized")
.load("/tmp/hudi_test2/hudi_test_demo"); {code}
----
Reson:
{color:#172b4d}The{color}{color:#0000ff} BaseHoodieTableFileIndex
{color}{color:#172b4d}is base class when read table, and it load FlieSlice when
{color}doRefresh().
{code:java}
if ((tableType.equals(HoodieTableType.MERGE_ON_READ) &&
queryType.equals(HoodieTableQueryType.SNAPSHOT))) {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path,
queryInstant.get())
.collect(Collectors.toList())
)
.orElse(Collections.emptyList())
)
);
} else {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
);
} {code}
but {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_ {color}has a
question:the valid base file ,
it's maybe from the max version fileslice before compaction. but
{color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_{color} cannot get.
{code:java}
@Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionStr, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction) {
...
Stream<FileSlice> fileSliceStream =
fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(),
maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return
fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
...
}
protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
...
FileSlice transformed =
new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile);
....
}{code}
this function will ignore base file when compaction start, and if compaction
failed, The base file in this fileslice always noreadable before compaction
recover.
I think, we can read the fileslice with largest available version in this
filegroup.
!image-2022-06-24-11-07-36-638.png!
spark use it in {color:#00875a}HoodieFileIndex.allFiles(). {color}it get all
base file in {color:#00875a}cachedAllInputFileSlices {color}
{code:java}
//代码占位符
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
.filter(_.getBaseFile.isPresent)
.map(_.getBaseFile.get().getFileStatus)
.toSeq
}{code}
{color:#0000ff} {color}
was:
In HoodieTableQueryType.READ_OPTIMIZED read mode
The corresponding FileGroup has no readable base file when compaction working.
and if this compaction failed, The corresponding base file has been unreadable.
> READ_OPTIMIZED read mode will temporary loss of data when compaction
> --------------------------------------------------------------------
>
> Key: HUDI-4308
> URL: https://issues.apache.org/jira/browse/HUDI-4308
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: aidendong
> Assignee: aidendong
> Priority: Major
> Labels: ReadFile, pull-request-available
> Attachments: image-2022-06-24-11-07-36-638.png
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Question :
> In HoodieTableQueryType.READ_OPTIMIZED read mode
> The corresponding FileGroup has no readable base file when compaction working.
> and if this compaction failed, The corresponding base file has been
> unreadable.
>
> demo:
>
> {code:java}
> //demo
> Dataset<Row> table = session.read()
> .format("org.apache.hudi")
> .option("hoodie.datasource.query.type", "read_optimized")
> .load("/tmp/hudi_test2/hudi_test_demo"); {code}
>
>
> ----
> Reson:
> {color:#172b4d}The{color}{color:#0000ff} BaseHoodieTableFileIndex
> {color}{color:#172b4d}is base class when read table, and it load FlieSlice
> when {color}doRefresh().
>
> {code:java}
> if ((tableType.equals(HoodieTableType.MERGE_ON_READ) &&
> queryType.equals(HoodieTableQueryType.SNAPSHOT))) {
> cachedAllInputFileSlices = partitionFiles.keySet().stream()
> .collect(Collectors.toMap(
> Function.identity(),
> partitionPath ->
> queryInstant.map(instant ->
>
> fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path,
> queryInstant.get())
> .collect(Collectors.toList())
> )
> .orElse(Collections.emptyList())
> )
> );
> } else {
> cachedAllInputFileSlices = partitionFiles.keySet().stream()
> .collect(Collectors.toMap(
> Function.identity(),
> partitionPath ->
> queryInstant.map(instant ->
> fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant,
> true)
> )
> .orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
> .collect(Collectors.toList())
> )
> );
> } {code}
> but {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_ {color}has
> a question:the valid base file ,
> it's maybe from the max version fileslice before compaction. but
> {color:#00875a}_fileSystemView.getLatestFileSlicesBeforeOrOn_{color} cannot
> get.
>
> {code:java}
> @Override
> public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
> partitionStr, String maxCommitTime,
> boolean includeFileSlicesInPendingCompaction) {
> ...
> Stream<FileSlice> fileSliceStream =
> fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
> .filter(slice ->
> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
> if (includeFileSlicesInPendingCompaction) {
> return
> fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
> ...
> }
> protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice)
> {
> ...
> FileSlice transformed =
> new FileSlice(fileSlice.getPartitionPath(),
> fileSlice.getBaseInstantTime(), fileSlice.getFileId());
> fileSlice.getLogFiles().forEach(transformed::addLogFile);
> ....
> }{code}
> this function will ignore base file when compaction start, and if compaction
> failed, The base file in this fileslice always noreadable before compaction
> recover.
> I think, we can read the fileslice with largest available version in this
> filegroup.
>
> !image-2022-06-24-11-07-36-638.png!
> spark use it in {color:#00875a}HoodieFileIndex.allFiles(). {color}it get all
> base file in {color:#00875a}cachedAllInputFileSlices {color}
> {code:java}
> //代码占位符
> def allFiles: Seq[FileStatus] = {
> cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
> .filter(_.getBaseFile.isPresent)
> .map(_.getBaseFile.get().getFileStatus)
> .toSeq
> }{code}
>
>
> {color:#0000ff} {color}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)