[
https://issues.apache.org/jira/browse/HUDI-4880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607682#comment-17607682
]
Teng Huo commented on HUDI-4880:
--------------------------------
Just looked through related code today, and I found where the marker files were
created.
In method {{HoodieMergeHandle.init}}, it calls a method {{createMarkerFile}} to
create a marker file for the new data file. But in this method, it will
complain an error if the marker file already exists. I think that is the error
happened in HUDI-4108.
{code:java}
protected void createMarkerFile(String partitionPath, String dataFileName) {
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
.create(partitionPath, dataFileName, getIOType());
}
{code}
Besides, as Danny mentioned, {{CompactionCommitSink}} will do rollback action
to delete all data files and marker files. So {{HoodieMergeHandle}} works
totally fine if {{CompactionCommitSink}} gets a failed
{{CompactionCommitEvent}} and do rollback successfully.
However, in our pipeline, compaction tasks and commit task were cancelled
directly due to a global failure in Flink application. There is no rollback
action performed.
Log file: [^job_manager.log]
In that case, data and marker files generated in a previous compaction will be
left over in HDFS, so when a new compaction tasks are trying to do the same
compaction instant, it will delete the marker files, but won't delete data
files.
> Corrupted parquet file found in Hudi Flink MOR pipeline
> -------------------------------------------------------
>
> Key: HUDI-4880
> URL: https://issues.apache.org/jira/browse/HUDI-4880
> Project: Apache Hudi
> Issue Type: Bug
> Components: compaction, flink
> Reporter: Teng Huo
> Assignee: Teng Huo
> Priority: Major
> Labels: pull-request-available
> Attachments: job_manager.log
>
>
> h2. Env
> Hudi version : 0.11.1 (but I believe this issue still exist in the current
> version)
> Flink version : 1.13
> Pipeline type: MOR, online compaction
> h2. TLDR
> Marker mechanism for cleaning corrupted parquet files is not effective now in
> Flink MOR online compaction due to this PR:
> [https://github.com/apache/hudi/pull/5611]
> h2. Issue description
> Recently, we suffered an issue which said there were corrupted parquet files
> in Hudi table, so this Hudi table is not readable, or compaction task will
> constantly fail.
> e.g. Spark application complained this parquet file is too small.
> {code:java}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 66 in
> stage 12.0 failed 4 times, most recent failure: Lost task 66.3 in stage 12.0
> (TID 156) (executor 6): java.lang.RuntimeException:
> hdfs://.../00000012-5e09-42d0-bf4e-2823a1a4bc7b_3-32-29_20220919020324533.parquet
> is not a Parquet file (too small length: 0)
> at
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
> {code}
> h2. Root cause
> After trouble shooting, I believe we might find the root cause of this issue.
> At the beginning, this Flink MOR pipeline failed due to some reason, which
> left a bunch of unfinished parquet files in this Hudi table. It is acceptable
> for Hudi because we can clean them later with "Marker" in the method
> "finalizeWrite". It will call a method named "reconcileAgainstMarkers". It
> will find out these files which are in the marker folder, but not in the
> commit metadata, mark them as corrupted files, then delete them.
> However, I found this part of code didn't work properly as expect, this
> corrupted parquet file
> "00000012-5e09-42d0-bf4e-2823a1a4bc7b_3-32-29_20220919020324533.parquet" was
> not deleted in "20220919020324533.commit".
> Then, we found there is [a piece of
> code|https://github.com/apache/hudi/blob/13eb892081fc4ddd5e1592ef8698831972012666/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java#L134]
> deleting the marker folder at the beginning of every batch of compaction.
> This causes the mechanism of deleting corrupt files to be a failure, since
> all marker files created before the current batch were deleted.
> And we found HDFS audit logs showing this marker folder
> "hdfs://.../.hoodie/.temp/20220919020324533" was deleted multiple times in a
> single Flink application, which proved the current behavior of
> "CompactionPlanOperator", it deletes marker folder every time.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)