yes, from the pasted schema, there is no default set for the newly added column.
{ "name" : "*_hoodie_is_soft_deleted*", "type" : [ "string", "null" ] } ] If you can fix that and give it a try, it should work. On Sat, 16 Jul 2022 at 03:13, Pratyaksh Sharma <pratyaks...@gmail.com> wrote: > Hi Aakash, > > For the field to behave as a nullable extra field, you need to add default > value as null to the schema and make "null" as the first type in your union > schema for `_hoodie_is_soft_deleted`.Hope that helps. > > On Fri, Jul 15, 2022 at 8:01 PM aakash aakash <email2aak...@gmail.com> > wrote: > >> Thanks for the response Pratyaksh! >> >> We add this column to the Spark dataframe before calling the hudi upsert >> and delete. And this should work like an extra nullable column in the >> schema but it's not behaving like that, so wondering if we remove any >> column with the prefix *'_hoodie' * in Hudi code. We wanted to this to be >> part of the platform so every team does not have to add an extra field in >> their prod schema since it is not supposed to be visible to everyone. >> >> >> Here is an excerpt of the code : >> >> object SoftDeleteColInfo { >> val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted" >> val softDeleteStrVal = "true" >> >> val softDeletedUDF = udf(softDeleted) >> >> def softDeleted() = (arg: String) => arg >> } >> >> sparkSession.udf.register("softDeletedUDF", >> SoftDeleteColInfo.softDeletedUDF) >> >> *df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF", >> lit("true")))* >> and the excerpt of the schema of dataframe before calling hudi operation : >> }, { >> "name" : "end_time_utc", >> "type" : [ { >> "type" : "long", >> "logicalType" : "timestamp-micros" >> }, "null" ] >> }, { >> "name" : "date_created_utc", >> "type" : [ { >> "type" : "long", >> "logicalType" : "timestamp-micros" >> }, "null" ] >> }, { >> "name" : "date_updated_utc", >> "type" : [ { >> "type" : "long", >> "logicalType" : "timestamp-micros" >> }, "null" ] >> }, { >> "name" : "*_hoodie_is_soft_deleted*", >> "type" : [ "string", "null" ] >> } ] >> } >> >> On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pratyaks...@gmail.com> >> wrote: >> >> > Hi, >> > >> > Hudi is complaining because '_hoodie_is_soft_deleted' is present in the >> > parquet file's schema but is not present in your incoming schema. >> > >> > From my experience, I would say it is a standard practice to add an >> extra >> > field which acts as a marker for soft deletion and needs to be persisted >> > with every record. So I would suggest adding an extra field in the >> schema >> > and solve your use case. >> > >> > @Sivabalan <n.siv...@gmail.com> can probably add more here. >> > >> > On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <email2aak...@gmail.com> >> > wrote: >> > >> > > Hi, >> > > >> > > We have a use case to perform soft delete over some record keys where >> we >> > > nullify non-key fields and ignore any update for this record later on. >> > We >> > > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi >> > hard >> > > delete (_hoodie_is_deleted) does to make it simple to identify if the >> > > platform perform any soft delete but I am getting avro field not found >> > > exception when we perform another soft delete on the same index, >> please >> > let >> > > me know if you have any advise how to fix it or if this is a wrong >> > > approach, we wanted to avoid adding any extra field in the customer >> > schema >> > > and behind the scene filter the soft delete record as done for hard >> > delete >> > > but still keep the record in the system. >> > > >> > > >> > > Hudi : 0.8.0 >> > > Exception stacktrace: >> > > >> > > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID >> > > 33283, 172.25.31.77, executor 3): >> > > org.apache.hudi.exception.HoodieUpsertException: Error upserting >> > bucketType >> > > UPDATE for partition :5 >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288) >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139) >> > > at >> > > >> > > >> > >> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) >> > > at >> > > >> > > >> > >> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) >> > > at >> > > >> > > >> > >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) >> > > at >> > > >> > > >> > >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) >> > > at >> > > >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >> > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >> > > at >> > > >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >> > > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) >> > > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) >> > > at >> > > >> > > >> > >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >> > > at >> > > >> > > >> > >> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >> > > at >> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >> > > at >> > > >> > > >> > >> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >> > > at >> > > >> > > >> > >> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >> > > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) >> > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) >> > > at >> > > >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >> > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >> > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >> > > at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >> > > at org.apache.spark.scheduler.Task.run(Task.scala:123) >> > > at >> > > >> > > >> > >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >> > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >> > > at >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >> > > at >> > > >> > > >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > > at >> > > >> > > >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > > at java.lang.Thread.run(Thread.java:748) >> > > Caused by: org.apache.hudi.exception.HoodieException: >> > > org.apache.hudi.exception.HoodieException: >> > > java.util.concurrent.ExecutionException: >> > > org.apache.hudi.exception.HoodieException: operation has failed >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317) >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308) >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281) >> > > ... 30 more >> > > Caused by: org.apache.hudi.exception.HoodieException: >> > > java.util.concurrent.ExecutionException: >> > > org.apache.hudi.exception.HoodieException: operation has failed >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) >> > > at >> > > >> > > >> > >> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) >> > > ... 33 more >> > > Caused by: java.util.concurrent.ExecutionException: >> > > org.apache.hudi.exception.HoodieException: operation has failed >> > > at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> > > at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) >> > > ... 34 more >> > > Caused by: org.apache.hudi.exception.HoodieException: operation has >> > failed >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) >> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > > ... 3 more >> > > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro >> > > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found >> > > at >> > > >> > > >> > >> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225) >> > > at >> > > >> > > >> > >> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130) >> > > at >> > > >> > > >> > >> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) >> > > at >> > > >> > > >> > >> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) >> > > at >> > > >> > > >> > >> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138) >> > > at >> > > >> > > >> > >> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183) >> > > at >> > > >> > >> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) >> > > at >> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) >> > > at >> > > >> > > >> > >> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) >> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > > at >> > > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > > ... 4 more >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > How we add this column to the Spark dataframe : >> > > >> > > object SoftDeleteColInfo { >> > > val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted" >> > > val softDeleteStrVal = "true" >> > > >> > > val softDeletedUDF = udf(softDeleted) >> > > >> > > def softDeleted() = (arg: String) => arg >> > > } >> > > >> > > sparkSession.udf.register("softDeletedUDF", >> > > SoftDeleteColInfo.softDeletedUDF) >> > > df.withColumn(softDeleteHudiMetaCol, >> > > functions.callUDF("softDeletedUDF", lit("true"))) >> > > >> > >> > -- Regards, -Sivabalan