ehurheap opened a new issue, #9079:
URL: https://github.com/apache/hudi/issues/9079

   **Describe the problem you faced**
   
   We wish to delete records matching specified record keys. This works for 
records written with `ComplexKeyGenerator`, but fails for records written with 
`UuidKeyGenerator`.
   
   If these records have been written with `ComplexKeyGenerator`, this 
succeeds. The record key is 2 fields, env_id and user_id. We query hudi to find 
all the records that match. We then delete these records using the datasource 
API. Success - the expected records are deleted.
   
   We try the same thing for records that have been written with 
`UuidKeyGenerator`. The record key for the table is the same as above: a 
combination of 2 fields, env_id and user_id. We query hudi as before and as 
before we find all the records that match. We then try to delete these records 
using the datasource API. No records are deleted.
   Again: we did confirm that records to delete are found. They are just not 
deleted.
   
   
   **To Reproduce**
   
   1. Successful case using ComplexKeyGenerator
   ```
       writeToHudi(records, classOf[ComplexKeyGenerator].getName)
       val written1 = spark.read.format("hudi").load(s"file://${tablePath}")
       assertEquals(written1.collect().length, 6)
       val deleteRecs1 = spark.read.format("hudi")
         .option(HoodieMetadataConfig.ENABLE.key(), "false")
         .load(tablePath)
         .where(col("env_id") === deleteEnvId)
         .where(col("user_id").isInCollection(deleteUserIDs))
   
       assertEquals(deleteRecs1.collect().length, 3) // confirms matching 3 
records have been found to delete
       deleteRecs1.write
         .format("hudi")
         .options(writeOptionsDelete(classOf[ComplexKeyGenerator].getName))
         .mode("append")
         .save(tablePath)
   
       val snapshot1 = spark.read
         .format("hudi")
         .load(s"file://${tablePath}")
   
       assertEquals(snapshot1.collect().length, 3).  // confirms 3 records were 
deleted, and 3 remain, as expected
       assertEquals(
         snapshot1.select(col("user_id")).collect().map(_.getAs[Long](0)).toSet,
         Set(11L, 21L, 22L)
       )
   ```
   2. Failing case using UuidKeyGenerator
   ```
       writeToHudi(records, classOf[UuidKeyGenerator].getName)
       val written2 = spark.read.format("hudi").load(s"file://${tablePath}")
       assertEquals(written2.collect().length, 6)
       val deleteRecs2 = spark.read.format("hudi")
         .option(HoodieMetadataConfig.ENABLE.key(), "false")
         .load(tablePath)
         .where(col("env_id") === deleteEnvId)
         .where(col("user_id").isInCollection(deleteUserIDs))
   
       assertEquals(deleteRecs2.collect().length, 3)  // confirms matching 3 
records have been found to delete
       deleteRecs2.write
         .format("hudi")
         .options(writeOptionsDelete(classOf[UuidKeyGenerator].getName))
         .mode("append")
         .save(tablePath)
   
       val snapshot2 = spark.read
         .format("hudi")
         .load(s"file://${tablePath}")
   
       assertEquals(snapshot2.collect().length, 3) //FAILS! Should be 3, but 
this finds 6 records. None were marked for deleted.
   ```
   3. supporting functions with write options
   ```
     def writeToHudi(properties: Seq[UserProperties], keyGeneratorClass: 
String): Unit = {
       val df = spark.createDataFrame(properties)
       df.write
         .format("hudi")
         .options(writeOptionsSave(keyGeneratorClass))
         .mode("append")
         .save(tablePath)
     }
   
     def writeOptionsSave(keyGeneratorClass: String) =  Map[String, String](
       "hoodie.table.name" -> "users_changes",
       "hoodie.datasource.write.keygenerator.class" -> keyGeneratorClass,
       "hoodie.datasource.write.recordkey.field" -> "env_id,user_id",
       "hoodie.datasource.write.partitionpath.field" -> "env_id,week",
       "hoodie.datasource.write.precombine.field" -> "schematized_at",
       "hoodie.datasource.write.operation" -> BULK_INSERT_OPERATION_OPT_VAL,
       "hoodie.datasource.write.table.type" -> MOR_TABLE_TYPE_OPT_VAL,
       "hoodie.datasource.write.hive_style_partitioning" -> "true",
       "hoodie.datasource.write.row.writer.enable" -> "false",
       "hoodie.metadata.enable" -> "false",
       "hoodie.bulkinsert.shuffle.parallelism" -> "2",
       "hoodie.datasource.compaction.async.enable" -> "false",
       "hoodie.compact.schedule.inline" -> "false",
       "hoodie.compact.inline" -> "false",
       "hoodie.bulkinsert.sort.mode" -> "NONE",
       "hoodie.combine.before.insert" -> "false",
       "hoodie.clean.automatic" -> "false",
       "hoodie.archive.automatic" -> "false"
     )
     def writeOptionsDelete(keyGeneratorClass: String) =  Map[String, String](
       "hoodie.table.name" -> "users_changes",
       "hoodie.datasource.write.keygenerator.class" -> keyGeneratorClass,
       "hoodie.datasource.write.recordkey.field" -> "env_id,user_id",
       "hoodie.datasource.write.partitionpath.field" -> "env_id,week",
       "hoodie.datasource.write.precombine.field" -> "schematized_at",
       "hoodie.datasource.write.operation" -> DELETE_OPERATION_OPT_VAL,
       "hoodie.datasource.write.table.type" -> MOR_TABLE_TYPE_OPT_VAL,
       "hoodie.datasource.write.hive_style_partitioning" -> "true",
       "hoodie.datasource.write.row.writer.enable" -> "false",
       "hoodie.metadata.enable" -> "false",
       "hoodie.bulkinsert.shuffle.parallelism" -> "2",
       "hoodie.datasource.compaction.async.enable" -> "false",
       "hoodie.compact.schedule.inline"-> "false",
       "hoodie.compact.inline" -> "true",
       "hoodie.compact.inline.max.delta.commits" -> "1",
       "hoodie.clean.automatic" -> "false",
       "hoodie.archive.automatic" -> "false"
     )
   ```
   
   **Expected behavior**
   Deletes using the datasource API should work regardless of keyGeneratorClass.
   
   **Environment Description**
   * Hudi version : 0.13.0
   * Spark version : 3.3
   * Storage (HDFS/S3/GCS..) : local storage (this is an integration test)
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   I also tried using a `JavaRDD<HoodieKey>` to delete the data as described 
[here](https://hudi.apache.org/blog/2020/01/15/delete-support-in-hudi/#delete-using-rdd-level-apis).
 This too failed but it threw an exception.
   ```
       writeToHudi(records, classOf[UuidKeyGenerator].getName)
       val written4 = spark.read.format("hudi").load(s"file://${tablePath}")
       assertEquals(written4
         .collect().length, 6)
   
       val deleteRecs4: Array[Row] = spark.read.format("hudi")
         .option(HoodieMetadataConfig.ENABLE.key(), "false")
         .load(tablePath)
         .where(col("env_id") === deleteEnvId)
         .where(col("user_id").isInCollection(deleteUserIDs))
         .select("_hoodie_record_key","_hoodie_partition_path", 
"env_id","user_id", "week")
         .collect()
       val deleteRecsHoodieKeys4 = deleteRecs4.map{row => {
         val recordKey = row.getAs[String](0)
         val partitionPath = row.getAs[String](1)
         new org.apache.hudi.common.model.HoodieKey(recordKey, partitionPath)
       }}
   
       assertEquals(deleteRecsHoodieKeys4.length, 3) // confirms that 3 records 
were found to delete
   
       val uuidKeysRDD: RDD[HoodieKey] = 
JavaSparkContext.fromSparkContext(spark.sparkContext).parallelize(deleteRecsHoodieKeys4.toList)
       val writeClient = getWriteClient()
       val deleteInstant: String = getDeleteInstant() // this is just a new 
hudi instant value
   
       // TODO: this fails with
       // org.apache.hudi.exception.HoodieUpsertException: Failed to delete for 
commit time 20230628180821617... caused by...
       // IllegalArgumentException at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
       // because it does not find any requested file for this instant. What 
should deleteInstant be set to?
       val statuses = writeClient.delete(uuidKeysRDD, deleteInstant)
   ```
   (it is not clear what the `commitTime` should be)
   
   **Stacktrace**
   
   ```org.apache.hudi.exception.HoodieUpsertException: Failed to delete for 
commit time 20230628180821617
   
        at 
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117)
        at 
org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80)
        at 
org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243)
        at 
com.heap.datalake.delete.DeleteVariationsSuite.$anonfun$new$1(DeleteVariationsSuite.scala:166)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   Caused by: java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:633)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionRequestedToInflight(HoodieActiveTimeline.java:698)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.saveWorkloadProfileMetadataToInflight(BaseCommitActionExecutor.java:147)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:172)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83)
        at 
org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103)
        ... 6 more```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to