[
https://issues.apache.org/jira/browse/HUDI-6675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen closed HUDI-6675.
----------------------------
Resolution: Fixed
Fixed via master branch: a5e5c7ba1e2db1b3d8d8fd6da441998d54e57a80
> Clean action will delete the whole table
> ----------------------------------------
>
> Key: HUDI-6675
> URL: https://issues.apache.org/jira/browse/HUDI-6675
> Project: Apache Hudi
> Issue Type: Bug
> Components: cleaning
> Affects Versions: 0.11.1, 0.13.0
> Environment: hudi 0.11 both 0.13.
> spark 3.4
> Reporter: sanqingleo
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.14.0
>
> Attachments: image-2023-08-10-10-35-02-798.png,
> image-2023-08-10-10-37-05-339.png
>
>
> h1. Abstract
> when I use inset_overwrite feature both in spark sql and api, It's will clean
> the whole table when it's not partition table
> then throw this exception
> !image-2023-08-10-10-37-05-339.png!
> h1. Version
> # hudi 0.11 both 0.13.
> # spark 3.4
> h1. Bug Position
> org.apache.hudi.table.action.clean.CleanActionExecutor#deleteFileAndGetResult
> !image-2023-08-10-10-35-02-798.png!
> h1. How to recurrent
> Need to run 4 times, fourth time will trigger clean action.
> 0.11, both sql and api
> 0.13 just api
>
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
> import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
> object InsertOverwriteTest {
> def main(array: Array[String]): Unit = {
> val spark = SparkSession.builder()
> .appName("TestInsertOverwrite")
> .master("local[4]")
> .config("spark.sql.extensions",
> "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
> .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .config("spark.sql.catalog.spark_catalog"
> ,"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
> .getOrCreate()
> spark.conf.set("hoodie.index.type", "BUCKET")
> spark.conf.set("hoodie.storage.layout.type", "BUCKET")
> spark.conf.set("HADOOP_USER_NAME", "parallels")
> System.setProperty("HADOOP_USER_NAME", "parallels")
> var seq = List(
> Row("uuid_01", "27", "2022-09-23", "par_01"),
> Row("uuid_02", "21", "2022-09-23", "par_02"),
> Row("uuid_03", "23", "2022-09-23", "par_04"),
> Row("uuid_04", "24", "2022-09-23", "par_02"),
> Row("uuid_05", "26", "2022-09-23", "par_01"),
> Row("uuid_06", "20", "2022-09-23", "par_03"),
> )
> var rdd = spark.sparkContext.parallelize(seq)
> var structType: StructType = StructType(Array(
> StructField("uuid", DataTypes.StringType, nullable = true),
> StructField("age", DataTypes.StringType, nullable = true),
> StructField("ts", DataTypes.StringType, nullable = true),
> StructField("par", DataTypes.StringType, nullable = true)
> ))
> var df1 = spark.createDataFrame(rdd, structType)
> .createOrReplaceTempView("compact_test_num")
> var df: DataFrame = spark.sql(" select uuid, age, ts, par from
> compact_test_num limit 10")
> df.write.format("org.apache.hudi")
> .option(RECORDKEY_FIELD.key, "uuid")
> .option(PRECOMBINE_FIELD.key, "ts")
> // .option(PARTITIONPATH_FIELD.key(), "par")
> .option("hoodie.table.keygenerator.class",
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
> .option(KEYGENERATOR_CLASS_NAME.key,
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
> // .option(KEYGENERATOR_CLASS_NAME.key,
> "org.apache.hudi.keygen.ComplexKeyGenerator")
> .option(OPERATION.key, INSERT_OVERWRITE_OPERATION_OPT_VAL)
> .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
> .option("hoodie.metadata.enable", "false")
> .option("hoodie.index.type", "BUCKET")
> .option("hoodie.bucket.index.hash.field", "uuid")
> .option("hoodie.bucket.index.num.buckets", "2")
> .option("hoodie.storage.layout.type", "BUCKET")
> .option("hoodie.storage.layout.partitioner.class",
> "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner")
> .option("hoodie.table.name", "cow_20230801_012")
> .option("hoodie.upsert.shuffle.parallelism", "2")
> .option("hoodie.insert.shuffle.parallelism", "2")
> .option("hoodie.delete.shuffle.parallelism", "2")
> .option("hoodie.clean.max.commits", "2")
> .option("hoodie.cleaner.commits.retained", "2")
> .option("hoodie.datasource.write.hive_style_partitioning", "true")
> .mode(SaveMode.Append)
> .save("hdfs://bigdata01:9000/hudi_test/cow_20230801_012")
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)