Thanks for the response Pratyaksh! We add this column to the Spark dataframe before calling the hudi upsert and delete. And this should work like an extra nullable column in the schema but it's not behaving like that, so wondering if we remove any column with the prefix *'_hoodie' * in Hudi code. We wanted to this to be part of the platform so every team does not have to add an extra field in their prod schema since it is not supposed to be visible to everyone.
Here is an excerpt of the code : object SoftDeleteColInfo { val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted" val softDeleteStrVal = "true" val softDeletedUDF = udf(softDeleted) def softDeleted() = (arg: String) => arg } sparkSession.udf.register("softDeletedUDF", SoftDeleteColInfo.softDeletedUDF) *df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF", lit("true")))* and the excerpt of the schema of dataframe before calling hudi operation : }, { "name" : "end_time_utc", "type" : [ { "type" : "long", "logicalType" : "timestamp-micros" }, "null" ] }, { "name" : "date_created_utc", "type" : [ { "type" : "long", "logicalType" : "timestamp-micros" }, "null" ] }, { "name" : "date_updated_utc", "type" : [ { "type" : "long", "logicalType" : "timestamp-micros" }, "null" ] }, { "name" : "*_hoodie_is_soft_deleted*", "type" : [ "string", "null" ] } ] } On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pratyaks...@gmail.com> wrote: > Hi, > > Hudi is complaining because '_hoodie_is_soft_deleted' is present in the > parquet file's schema but is not present in your incoming schema. > > From my experience, I would say it is a standard practice to add an extra > field which acts as a marker for soft deletion and needs to be persisted > with every record. So I would suggest adding an extra field in the schema > and solve your use case. > > @Sivabalan <n.siv...@gmail.com> can probably add more here. > > On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <email2aak...@gmail.com> > wrote: > > > Hi, > > > > We have a use case to perform soft delete over some record keys where we > > nullify non-key fields and ignore any update for this record later on. > We > > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi > hard > > delete (_hoodie_is_deleted) does to make it simple to identify if the > > platform perform any soft delete but I am getting avro field not found > > exception when we perform another soft delete on the same index, please > let > > me know if you have any advise how to fix it or if this is a wrong > > approach, we wanted to avoid adding any extra field in the customer > schema > > and behind the scene filter the soft delete record as done for hard > delete > > but still keep the record in the system. > > > > > > Hudi : 0.8.0 > > Exception stacktrace: > > > > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID > > 33283, 172.25.31.77, executor 3): > > org.apache.hudi.exception.HoodieUpsertException: Error upserting > bucketType > > UPDATE for partition :5 > > at > > > > > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288) > > at > > > > > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139) > > at > > > > > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > > at > > > > > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > > at > > > > > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > > at > > > > > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > > at > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) > > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) > > at > > > > > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > > at > > > > > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > > at > > > > > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > > at > > > > > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > > at > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > > at org.apache.spark.scheduler.Task.run(Task.scala:123) > > at > > > > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.hudi.exception.HoodieException: > > org.apache.hudi.exception.HoodieException: > > java.util.concurrent.ExecutionException: > > org.apache.hudi.exception.HoodieException: operation has failed > > at > > > > > org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) > > at > > > > > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317) > > at > > > > > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308) > > at > > > > > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281) > > ... 30 more > > Caused by: org.apache.hudi.exception.HoodieException: > > java.util.concurrent.ExecutionException: > > org.apache.hudi.exception.HoodieException: operation has failed > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) > > at > > > > > org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) > > ... 33 more > > Caused by: java.util.concurrent.ExecutionException: > > org.apache.hudi.exception.HoodieException: operation has failed > > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) > > ... 34 more > > Caused by: org.apache.hudi.exception.HoodieException: operation has > failed > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > ... 3 more > > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro > > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found > > at > > > > > org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225) > > at > > > > > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130) > > at > > > > > org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) > > at > > > > > org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) > > at > > > > > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) > > at > > > > > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) > > at > > > org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) > > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) > > at > > > > > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > > at > > > > > org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > at > > > > > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > ... 4 more > > > > > > > > > > > > > > > > How we add this column to the Spark dataframe : > > > > object SoftDeleteColInfo { > > val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted" > > val softDeleteStrVal = "true" > > > > val softDeletedUDF = udf(softDeleted) > > > > def softDeleted() = (arg: String) => arg > > } > > > > sparkSession.udf.register("softDeletedUDF", > > SoftDeleteColInfo.softDeletedUDF) > > df.withColumn(softDeleteHudiMetaCol, > > functions.callUDF("softDeletedUDF", lit("true"))) > > >