[
https://issues.apache.org/jira/browse/HUDI-3634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506916#comment-17506916
]
Hui An commented on HUDI-3634:
------------------------------
Some logs about this issue:
The upstream completed commit at 12:14:30
{code:java}
22/03/14 12:14:28 INFO HoodieActiveTimeline: Marking instant complete
[==>20220314121317153__commit__INFLIGHT]
22/03/14 12:14:28 INFO HoodieActiveTimeline: Checking for file exists
?hdfs://R2/projects/percept/hdfs/dev/hudi/spark_application_hudi/.hoodie/20220314121317153.inflight
22/03/14 12:14:30 INFO HoodieActiveTimeline: Create new file for
toInstant
?hdfs://R2/projects/percept/hdfs/dev/hudi/spark_application_hudi/.hoodie/20220314121317153.commit
22/03/14 12:14:30 INFO HoodieActiveTimeline: Completed
[==>20220314121317153__commit__INFLIGHT]
{code}
But the downstream already read the complete commit at 12:14:28
{code:java}
22/03/14 12:14:28 INFO HoodieActiveTimeline: Loaded instants upto :
Option{val=[==>20220314121317153__commit__INFLIGHT]}
22/03/14 12:14:28 INFO HoodieActiveTimeline: Loaded instants upto :
Option{val=[==>20220314121317153__commit__INFLIGHT]}
22/03/14 12:14:28 INFO HoodieActiveTimeline: Loaded instants upto :
Option{val=[20220314121317153__commit__COMPLETED]}
22/03/14 12:14:28 INFO CheckpointFileManager: Writing atomically to
hdfs://test/projects/analyzer/checkpoint/report/offsets/13981 using temp file
hdfs://test/projects/analyzer/checkpoint/report/offsets/.13981.a5a48d1f-6c66-4b3e-a8d6-27150f8e7205.tmp
{code}
> Could read empty or partial HoodieCommitMetaData in downstream if using HDFS
> ----------------------------------------------------------------------------
>
> Key: HUDI-3634
> URL: https://issues.apache.org/jira/browse/HUDI-3634
> Project: Apache Hudi
> Issue Type: Bug
> Components: timeline-server
> Reporter: Hui An
> Priority: Major
> Labels: pull-request-available
>
> If we're using Incremental query to continues read the HUDI upstream, it
> could miss some batches
> As we use Fsoutputstream to create an outputstream and then write the commit
> data,
> {code:java}
> // HoodieActiveTimeline
> private void createImmutableFileInPath(Path fullPath, Option<byte[]>
> content) {
> FSDataOutputStream fsout = null;
> try {
> fsout = metaClient.getFs().create(fullPath, false);
> if (content.isPresent()) {
> fsout.write(content.get());
> }
> } catch (IOException e) {
> throw new HoodieIOException("Failed to create file " + fullPath, e);
> } finally {
> try {
> if (null != fsout) {
> fsout.close();
> }
> } catch (IOException e) {
> throw new HoodieIOException("Failed to close file " + fullPath, e);
> }
> }
> }
> {code}
> HDFS will first create an empty file and then return the outputstream, if at
> this moment the data is not write yet, the downstream could read empty
> metadata, as it cannot get fileId and locations from the metadata, it will
> skip this commit and return an empty dataframe
> {code:java}
> // IncrementalRelation
> for (commit <- commitsToReturn) {
> // As this commit is empty, so HoodieCommitMetaData has nothing
> val metadata: HoodieCommitMetadata =
> HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
> .get, classOf[HoodieCommitMetadata])
> if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS ==
> commit.getTimestamp) {
> metaBootstrapFileIdToFullPath ++=
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
> replacedFile.contains(k) && v.startsWith(replacedFile(k))
> }
> } else {
> regularFileIdToFullPath ++=
> metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
> replacedFile.contains(k) && v.startsWith(replacedFile(k))
> }
> }
> }
> {code}
> This pr introduces a new configure to try to write the commit data to a temp
> file, and after the write is done, then move the temp file back to the commit.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)