ergunbaris opened a new issue, #11718:
URL: https://github.com/apache/hudi/issues/11718
**Description**
We have a Spark Application in Golden Zone (medallion arch.) that aggregates
the input data, runs in batches in order to write out daily partitions. In the
same spark application we have a functionality in order to overwrite each daily
partition with hudi's INSERT_OVERWRITE mode. Since we cannot hit all the daily
partition aggregations at a single batch we do sequential batches of many daily
partitions being produced in each sequence.
Lately we did have to fix data for daily partitions for a 2 year period
which ran and complete in dozens of batches (in a single spark application
without any intervention) . The first batches we produced ran successfully and
generated new parquet files for each daily partition and unreferenced old
parquet files from hudi metadata which than were removed with an async hudi
cleaner job. For the final x number of batches this behaviour shifted on not
unreferencing the old parquet files from hudi metadata followed by cleaner not
deleting (as expected) old parquet files thus ended up with duplicate data.
Again this happened in a single uninterrupted spark application.
For security reasons I cannot share the archived partition metadata which is
referencing the old and the new parquet files. But I have observed this on the
problematic daily partitions that the old and the new parquet files were both
listed on partition metadata when observed with hudi-cli. And when I observed
the related clean job I saw that the days it worked it deleted parquet files
where as the days towards the end of application run none of the old parquet
files were deleted since they weren't unreferenced.
When the problem started to happen it happened for all the rest of the
batches that run sequentially. Such as it didn't randomly happen for an
arbitrary batch and kept on happening for the next sequence of batches when it
started to happen.
Here is the piece of anonymised code for Hudi Options used during insert
overwrite. When argument "force" is passed from top level application it
applies to all the batches in the same way.
```scala
Map[String, String](
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "some_id",
HoodieWriteConfig.TBL_NAME.key -> "some_table_name",
DataSourceWriteOptions.OPERATION.key ->
(if (overWrite)
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL
else DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL),
DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "date",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "some_id",
"hoodie.copyonwrite.record.size.estimate" -> "512",
"hoodie.combine.before.upsert" -> "false",
"hoodie.clean.automatic" -> "false", //a
separate air-flow job runs HoodieCleaner
"hoodie.metadata.enable" ->
arguments.isHudiMetadataEnabled.toString
)
```
And the spark write code.
```scala
requests.write
.format("org.apache.hudi")
.options(hudiReplayOptions(overWrite = overWrite, arguments =
arguments) ++ additionalHudiOptions)
.mode(SaveMode.Append)
.save(writePath)
```
These are the additionalHudiOptions
```scala
Map[String, String](
"hoodie.write.concurrency.mode" ->
"optimistic_concurrency_control",
"hoodie.write.lock.provider" ->
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
"hoodie.write.lock.filesystem.expire" ->
arguments.hudiLockExpiryDuration.toString
```
We are in the phase of replacing hudi lock mechanism to move away from S3
based locking since it was later announced by hudi support not to be relied on.
There is another concurrent incremental hudi job running during the partition
fix job although it is highly unlikely that both processes try to acquire the
lock at the same atomic moment. And even if this was a source to suspect lets
remember that when the problem started to happen for batch it happened for all
the next batches as well. It is impossible this to happen sequentially for many
times. And also during the operation no hudi metadata related failure or
corruption happened.
**To Reproduce**
Steps to reproduce the behavior:
1. Unfortunately we could not reproduce the same outcome with existing
production code! And we didn't run the same process again with the same data
since it very costly to be able to recreate the situation.
**Expected behavior**
We expect INSERT_OVERWRITE to work as expected for all the sequential
batches within the same Spark application.
**Environment Description**
* Hudi version : 0.12.3
* Spark version : 3.3.0
* Hive version :3.1.3
* Hadoop version : 3.3.3
* EMR Version: emr-6.9.0
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Stacktrace**
```NO Stacktrace since there is no Failure..```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]