ranjani1993 opened a new issue, #7693:
URL: https://github.com/apache/hudi/issues/7693
**Describe the problem you faced**
HUDI file cleanup is not working as expected when we run it along with data
ingestion.
**config used:**
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
If we keep ("hoodie.cleaner.fileversions.retained","1") --> we see 2 or 3
versions of file is maintained
If we keep ("hoodie.cleaner.fileversions.retained","0") --> we get an "File
not found exception"
**Note**: Cleanup is working file if we run it as a independent process
Steps to reproduce the behavior:
scenario 1) ("hoodie.cleaner.fileversions.retained","0")
Step 1) Create HUDI external table
beeline
add jar gs://xxx/hudi-hadoop-mr-bundle-0.11.1.jar;
set hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat;
CREATE EXTERNAL TABLE `stg.td_item_hudi`(
`item_nbr` int,
`dept_nbr` int,
`fineline_nbr` int,
`upc_nbr` int,
`product_nbr` int,
`consumer_item_nbr` int,
`item_status_code` char(1),
`assortment_type_cd` int)
partitioned by(ts_created string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'gs://xxx/stg.db/td_item_hudi';
Step 2) Upsert data
spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--conf "spark.sql.parquet.writeLegacyFormat=true" \
--conf "spark.sql.parquet.enableVectorizedReader=false"
import org.apache.spark.sql.DataFrame
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
val tableName = "stg.td_item_hudi"
val basePath = "gs://xxxx/stg.db/td_item_hudi"
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(2, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01"),
(3, 1000, 100, 111111, 1, 1, "A", 2,"2022-01-01"),
(4, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(5, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
23/01/18 07:34:56 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in
stage 63.0 failed 10 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 63.0 failed 10 times, most recent failure: Lost task 0.9 in stage 63.0
(TID 2504, fd-persistent-sw-jbnh.c.wmt-bfdms-mdsefdprd.internal, executor 2):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:875)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:875)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:414)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException:
java.io.FileNotFoundException: File not found:
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/b9d8357c-b096-4910-ac44-461314d4b6b6-0_0-23-1262_20230118073254752.parquet
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
... 28 more
Caused by: java.io.FileNotFoundException: File not found:
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/b9d8357c-b096-4910-ac44-461314d4b6b6-0_0-23-1262_20230118073254752.parquet
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1082)
at
org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:300)
at
org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:70)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:134)
... 31 more
Scenario 2)
If we give - option("hoodie.cleaner.fileversions.retained","1"). --> we see
2 or 3 file versions are maintained.
Steps to reproduce:
spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--conf "spark.sql.parquet.writeLegacyFormat=true" \
--conf "spark.sql.parquet.enableVectorizedReader=false"
import org.apache.spark.sql.DataFrame
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
val tableName = "stgtd_item_hudi"
val basePath = "gs://xxx/stg.db/td_item_hudi"
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(2, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01"),
(3, 1000, 100, 111111, 1, 1, "A", 2,"2022-01-01"),
(4, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(5, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 2 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41
gs://xxx/stgdb/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436612 2023-01-18 06:42
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-27-12
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436612 2023-01-18 06:42
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-27-1219_20230118064106358.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49
gs://xxx/stgs.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436560 2023-01-18 06:52
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-99-3661_20230118065243463.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436571 2023-01-18 06:58
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-145-4890_20230118065755261.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436560 2023-01-18 06:52
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-99-3661_20230118065243463.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436571 2023-01-18 06:58
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-145-4890_20230118065755261.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436570 2023-01-18 07:00
gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-185-6115_20230118070030142.parquet
**Environment Description**
* Hudi version : 0.11.1
* Spark version : 2.4.8
* Hive version : 2.3.7
* Hadoop version : 2.10.1
* Storage (HDFS/S3/GCS..) : GCS
* Running on Docker? (yes/no) : no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]