ehurheap opened a new issue, #9079:
URL: https://github.com/apache/hudi/issues/9079
**Describe the problem you faced**
We wish to delete records matching specified record keys. This works for
records written with `ComplexKeyGenerator`, but fails for records written with
`UuidKeyGenerator`.
If these records have been written with `ComplexKeyGenerator`, this
succeeds. The record key is 2 fields, env_id and user_id. We query hudi to find
all the records that match. We then delete these records using the datasource
API. Success - the expected records are deleted.
We try the same thing for records that have been written with
`UuidKeyGenerator`. The record key for the table is the same as above: a
combination of 2 fields, env_id and user_id. We query hudi as before and as
before we find all the records that match. We then try to delete these records
using the datasource API. No records are deleted.
Again: we did confirm that records to delete are found. They are just not
deleted.
**To Reproduce**
1. Successful case using ComplexKeyGenerator
```
writeToHudi(records, classOf[ComplexKeyGenerator].getName)
val written1 = spark.read.format("hudi").load(s"file://${tablePath}")
assertEquals(written1.collect().length, 6)
val deleteRecs1 = spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.load(tablePath)
.where(col("env_id") === deleteEnvId)
.where(col("user_id").isInCollection(deleteUserIDs))
assertEquals(deleteRecs1.collect().length, 3) // confirms matching 3
records have been found to delete
deleteRecs1.write
.format("hudi")
.options(writeOptionsDelete(classOf[ComplexKeyGenerator].getName))
.mode("append")
.save(tablePath)
val snapshot1 = spark.read
.format("hudi")
.load(s"file://${tablePath}")
assertEquals(snapshot1.collect().length, 3). // confirms 3 records were
deleted, and 3 remain, as expected
assertEquals(
snapshot1.select(col("user_id")).collect().map(_.getAs[Long](0)).toSet,
Set(11L, 21L, 22L)
)
```
2. Failing case using UuidKeyGenerator
```
writeToHudi(records, classOf[UuidKeyGenerator].getName)
val written2 = spark.read.format("hudi").load(s"file://${tablePath}")
assertEquals(written2.collect().length, 6)
val deleteRecs2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.load(tablePath)
.where(col("env_id") === deleteEnvId)
.where(col("user_id").isInCollection(deleteUserIDs))
assertEquals(deleteRecs2.collect().length, 3) // confirms matching 3
records have been found to delete
deleteRecs2.write
.format("hudi")
.options(writeOptionsDelete(classOf[UuidKeyGenerator].getName))
.mode("append")
.save(tablePath)
val snapshot2 = spark.read
.format("hudi")
.load(s"file://${tablePath}")
assertEquals(snapshot2.collect().length, 3) //FAILS! Should be 3, but
this finds 6 records. None were marked for deleted.
```
3. supporting functions with write options
```
def writeToHudi(properties: Seq[UserProperties], keyGeneratorClass:
String): Unit = {
val df = spark.createDataFrame(properties)
df.write
.format("hudi")
.options(writeOptionsSave(keyGeneratorClass))
.mode("append")
.save(tablePath)
}
def writeOptionsSave(keyGeneratorClass: String) = Map[String, String](
"hoodie.table.name" -> "users_changes",
"hoodie.datasource.write.keygenerator.class" -> keyGeneratorClass,
"hoodie.datasource.write.recordkey.field" -> "env_id,user_id",
"hoodie.datasource.write.partitionpath.field" -> "env_id,week",
"hoodie.datasource.write.precombine.field" -> "schematized_at",
"hoodie.datasource.write.operation" -> BULK_INSERT_OPERATION_OPT_VAL,
"hoodie.datasource.write.table.type" -> MOR_TABLE_TYPE_OPT_VAL,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.row.writer.enable" -> "false",
"hoodie.metadata.enable" -> "false",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.datasource.compaction.async.enable" -> "false",
"hoodie.compact.schedule.inline" -> "false",
"hoodie.compact.inline" -> "false",
"hoodie.bulkinsert.sort.mode" -> "NONE",
"hoodie.combine.before.insert" -> "false",
"hoodie.clean.automatic" -> "false",
"hoodie.archive.automatic" -> "false"
)
def writeOptionsDelete(keyGeneratorClass: String) = Map[String, String](
"hoodie.table.name" -> "users_changes",
"hoodie.datasource.write.keygenerator.class" -> keyGeneratorClass,
"hoodie.datasource.write.recordkey.field" -> "env_id,user_id",
"hoodie.datasource.write.partitionpath.field" -> "env_id,week",
"hoodie.datasource.write.precombine.field" -> "schematized_at",
"hoodie.datasource.write.operation" -> DELETE_OPERATION_OPT_VAL,
"hoodie.datasource.write.table.type" -> MOR_TABLE_TYPE_OPT_VAL,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.row.writer.enable" -> "false",
"hoodie.metadata.enable" -> "false",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.datasource.compaction.async.enable" -> "false",
"hoodie.compact.schedule.inline"-> "false",
"hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1",
"hoodie.clean.automatic" -> "false",
"hoodie.archive.automatic" -> "false"
)
```
**Expected behavior**
Deletes using the datasource API should work regardless of keyGeneratorClass.
**Environment Description**
* Hudi version : 0.13.0
* Spark version : 3.3
* Storage (HDFS/S3/GCS..) : local storage (this is an integration test)
* Running on Docker? (yes/no) : no
**Additional context**
I also tried using a `JavaRDD<HoodieKey>` to delete the data as described
[here](https://hudi.apache.org/blog/2020/01/15/delete-support-in-hudi/#delete-using-rdd-level-apis).
This too failed but it threw an exception.
```
writeToHudi(records, classOf[UuidKeyGenerator].getName)
val written4 = spark.read.format("hudi").load(s"file://${tablePath}")
assertEquals(written4
.collect().length, 6)
val deleteRecs4: Array[Row] = spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.load(tablePath)
.where(col("env_id") === deleteEnvId)
.where(col("user_id").isInCollection(deleteUserIDs))
.select("_hoodie_record_key","_hoodie_partition_path",
"env_id","user_id", "week")
.collect()
val deleteRecsHoodieKeys4 = deleteRecs4.map{row => {
val recordKey = row.getAs[String](0)
val partitionPath = row.getAs[String](1)
new org.apache.hudi.common.model.HoodieKey(recordKey, partitionPath)
}}
assertEquals(deleteRecsHoodieKeys4.length, 3) // confirms that 3 records
were found to delete
val uuidKeysRDD: RDD[HoodieKey] =
JavaSparkContext.fromSparkContext(spark.sparkContext).parallelize(deleteRecsHoodieKeys4.toList)
val writeClient = getWriteClient()
val deleteInstant: String = getDeleteInstant() // this is just a new
hudi instant value
// TODO: this fails with
// org.apache.hudi.exception.HoodieUpsertException: Failed to delete for
commit time 20230628180821617... caused by...
// IllegalArgumentException at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
// because it does not find any requested file for this instant. What
should deleteInstant be set to?
val statuses = writeClient.delete(uuidKeysRDD, deleteInstant)
```
(it is not clear what the `commitTime` should be)
**Stacktrace**
```org.apache.hudi.exception.HoodieUpsertException: Failed to delete for
commit time 20230628180821617
at
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117)
at
org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45)
at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105)
at
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80)
at
org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243)
at
com.heap.datalake.delete.DeleteVariationsSuite.$anonfun$new$1(DeleteVariationsSuite.scala:166)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
Caused by: java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:633)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionRequestedToInflight(HoodieActiveTimeline.java:698)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.saveWorkloadProfileMetadataToInflight(BaseCommitActionExecutor.java:147)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:172)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83)
at
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103)
... 6 more```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]