[
https://issues.apache.org/jira/browse/HUDI-3634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit closed HUDI-3634.
-----------------------------
Fix Version/s: 0.12.0
Resolution: Fixed
> Could read empty or partial HoodieCommitMetaData in downstream if using HDFS
> ----------------------------------------------------------------------------
>
> Key: HUDI-3634
> URL: https://issues.apache.org/jira/browse/HUDI-3634
> Project: Apache Hudi
> Issue Type: Bug
> Components: timeline-server
> Reporter: Hui An
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.12.0
>
>
> If we're using Incremental query to continues read the HUDI upstream, it
> could miss some batches
> As we use Fsoutputstream to create an outputstream and then write the commit
> data,
> {code:java}
> // HoodieActiveTimeline
> private void createImmutableFileInPath(Path fullPath, Option<byte[]>
> content) {
> FSDataOutputStream fsout = null;
> try {
> fsout = metaClient.getFs().create(fullPath, false);
> if (content.isPresent()) {
> fsout.write(content.get());
> }
> } catch (IOException e) {
> throw new HoodieIOException("Failed to create file " + fullPath, e);
> } finally {
> try {
> if (null != fsout) {
> fsout.close();
> }
> } catch (IOException e) {
> throw new HoodieIOException("Failed to close file " + fullPath, e);
> }
> }
> }
> {code}
> HDFS will first create an empty file and then return the outputstream, if at
> this moment the data is not write yet, the downstream could read empty
> metadata, as it cannot get fileId and locations from the metadata, it will
> skip this commit and return an empty dataframe
> {code:java}
> // IncrementalRelation
> for (commit <- commitsToReturn) {
> // As this commit is empty, so HoodieCommitMetaData has nothing
> val metadata: HoodieCommitMetadata =
> HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
> .get, classOf[HoodieCommitMetadata])
> if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS ==
> commit.getTimestamp) {
> metaBootstrapFileIdToFullPath ++=
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
> replacedFile.contains(k) && v.startsWith(replacedFile(k))
> }
> } else {
> regularFileIdToFullPath ++=
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
> replacedFile.contains(k) && v.startsWith(replacedFile(k))
> }
> }
> }
> {code}
> This pr introduces a new configure to try to write the commit data to a temp
> file, and after the write is done, then move the temp file back to the commit.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)