kasured opened a new issue, #5298:
URL: https://github.com/apache/hudi/issues/5298
**Describe the problem you faced**
When inline compaction is turned on and when the actual compaction plan is
completed, the commit file is referencing the file which has been deleted
during the compaction process. Later, this is causing the reader to fail with
FileNotFoudException
**To Reproduce**
I managed to reproduce the issue on a constant basis. After the first
compaction action is completed it causes all subsequent reads to fail, because
the commit file is referencing the already deleted parquet file on the system.
Please see Additional Context session for more details. The issues can only be
reproduced when multiple tables are used within the same SparkSession.
**Expected behavior**
After inline compaction the commit files in .hoodie folder are in sync with
the files in the file system. Also there are no files deleted during the
compaction.
**Environment Description**
* EMR version: 6.5.0
* Hudi version : 0.9.0-amzn-1
* Spark version : 3.1.2
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
We are using Spark streaming with Kafka topics as a source. Topic ->
foreachBatch -> Dataframe write -> Hudi MOR table. For each table we are using
the following related configuration options
```
"hoodie.datasource.write.table.type" = "MERGE_ON_READ"
"hoodie.datasource.write.hive_style_partitioning" = "true"
"hoodie.finalize.write.parallelism" = "4"
"hoodie.upsert.shuffle.parallelism" = "4"
"hoodie.compact.inline" = "true"
"hoodie.compact.inline.max.delta.seconds" = "3600"
"hoodie.compact.inline.trigger.strategy" = "TIME_ELAPSED"
"hoodie.clean.automatic" = "true"
"hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS"
"hoodie.cleaner.commits.retained" = "18"
"hoodie.metadata.cleaner.commits.retained" = "18"
"hoodie.keep.min.commits" = "36"
"hoodie.keep.max.commits" = "72"
"hoodie.clustering.inline" = "false"
"hoodie.clustering.inline.max.commits" = "4"
"hoodie.clustering.plan.strategy.target.file.max.bytes" =
"1073741824"
"hoodie.clustering.plan.strategy.small.file.limit" = "629145600"
"hoodie.metadata.enable" = "false"
"hoodie.metadata.keep.min.commits" = "36"
"hoodie.metadata.keep.max.commits" = "72"
"hoodie.datasource.compaction.async.enable" = "true"
```
**Course of Events**
Let us take the file which the reader tries to find
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet and
show how this is changed
* Compaction completed and and there is also no cleans yet to delete the old
files
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20220411202305 │ COMPLETED │ 3 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝
═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time
Taken ║
╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣
║ (empty)
║
╚══════════════════════════════════════════════════════════════════════════════╝
* On s3 we can see the following timeline for the compaction process. Please
mark the modification time
20220411202305.commit commit April
11, 2022, 22:23:55 (UTC+02:00)
20220411202305.compaction.inflight inflight April
11, 2022, 22:23:08 (UTC+02:00)
20220411202305.compaction.requested requested April 11, 2022,
22:23:07 (UTC+02:00)
* On S3 we can see the following
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet
Delete marker April 11, 2022, 22:23:55 (UTC+02:00)
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet
parquet April 11, 2022, 22:23:28 (UTC+02:00)
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet
parquet April 11, 2022, 22:23:54 (UTC+02:00)
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_2-75-1434_20220411191603.parquet
parquet April 11, 2022, 21:19:15 (UTC+02:00)
Please pay attention to the fact that the file under consideration has been
deleted with the delete marker at the same time the compaction commit happened
which is 22:23:55. Also please pay attention that the only thing that changed
is the writeToken. After that moment there is a new file
4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet.
However, this file is not reflected in 20220411202305.commit which can be seen
below
```
"fileId" : "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0",
"path" :
"cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
"prevCommit" : "20220411191603",
"numWrites" : 122486,
"numDeletes" : 0,
"numUpdateWrites" : 122457,
"numInserts" : 0,
"totalWriteBytes" : 7528604,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "cluster=96/shard=14377",
"totalLogRecords" : 846489,
"totalLogFilesCompacted" : 7,
"totalLogSizeCompacted" : 325539587,
"totalUpdatedRecordsCompacted" : 122457,
"totalLogBlocks" : 7,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 7528604,
"minEventTime" : null,
"maxEventTime" : null
"fileIdAndRelativePaths" : {
"4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0" :
"cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
"0139f10d-7a88-481b-b5df-6516500076b0-0" :
"cluster=96/shard=14377/0139f10d-7a88-481b-b5df-6516500076b0-0_0-1177-14258_20220411202305.parquet",
"21000940-a573-4c46-8ad5-79003ac9daf5-0" :
"cluster=96/shard=14377/21000940-a573-4c46-8ad5-79003ac9daf5-0_2-1177-14260_20220411202305.parquet"
},
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 2543875,
"totalLogFilesCompacted" : 21,
"totalCompactedRecordsUpdated" : 368137,
"totalLogFilesSize" : 978467842,
"totalScanTime" : 45421,
```
* Now when I check the file system view with the command `show fsview
latest` Hudi shows the new file but not deleted
```
║ Partition │ FileId │
Base-Instant │ Data-File │
Data-File Size │ Num Delta Files │ Total Delta Size │ Delta Size - compaction
scheduled │ Delta Size - compaction unscheduled │ Delta To Base Ratio -
compaction scheduled │ Delta To Base Ratio - compaction unscheduled │ Delta
Files - compaction scheduled
│ Delta Files - compaction unscheduled ║
║ cluster=96/shard=14377/ │ 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0 │
20220411202305 │
s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet
│ 7.2 MB │ 4 │ 178.2 MB │ 178.2 MB
│ 0.0 B │ 24.814273985207016 │
0.0 │
[HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.4_0-1758-20808',
fileLen=46785516},
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.3_1-1610-19092',
fileLen=46440363},
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.2_0-1450-17202',
fileLen=4706
8201},
HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.1_0-1297-15405',
fileLen=46545393}] │ []
```
**Tried options**
* Turn off File Sizing by setting hoodie.parquet.small.file.limit to 0 to
make sure the file is not deleted
* With one table the inline compaction is working as expected
**Stacktrace**
```
Lost task 0.0 in stage 1.0 (TID 1) (ip.ec2.internal executor 2):
java.io.FileNotFoundException: No such file or directory
's3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet'
at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521)
at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:61)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:318)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:317)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:319)
at
org.apache.hudi.HoodieMergeOnReadRDD.read(HoodieMergeOnReadRDD.scala:105)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]