yes, from the pasted schema, there is no default set for the newly added
column.

{
    "name" : "*_hoodie_is_soft_deleted*",
    "type" : [ "string", "null" ]
  } ]

If you can fix that and give it a try, it should work.


On Sat, 16 Jul 2022 at 03:13, Pratyaksh Sharma <pratyaks...@gmail.com>
wrote:

> Hi Aakash,
>
> For the field to behave as a nullable extra field, you need to add default
> value as null to the schema and make "null" as the first type in your union
> schema for `_hoodie_is_soft_deleted`.Hope that helps.
>
> On Fri, Jul 15, 2022 at 8:01 PM aakash aakash <email2aak...@gmail.com>
> wrote:
>
>> Thanks for the response Pratyaksh!
>>
>> We add this column to the Spark dataframe before calling the hudi upsert
>> and delete. And this should work like an extra nullable column in the
>> schema but it's not behaving like that, so wondering if we remove any
>> column with the prefix *'_hoodie' * in Hudi code.  We wanted to this to be
>> part of the platform so every team does not have to add an extra field in
>> their prod schema since it is not supposed to be visible to everyone.
>>
>>
>> Here is an excerpt of the code :
>>
>> object SoftDeleteColInfo {
>>   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>>   val softDeleteStrVal = "true"
>>
>>   val softDeletedUDF = udf(softDeleted)
>>
>>   def softDeleted() = (arg: String) => arg
>> }
>>
>> sparkSession.udf.register("softDeletedUDF",
>> SoftDeleteColInfo.softDeletedUDF)
>>
>> *df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF",
>> lit("true")))*
>> and the excerpt of the schema of dataframe before calling hudi operation :
>> }, {
>>     "name" : "end_time_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "date_created_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "date_updated_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "*_hoodie_is_soft_deleted*",
>>     "type" : [ "string", "null" ]
>>   } ]
>> }
>>
>> On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pratyaks...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > Hudi is complaining because '_hoodie_is_soft_deleted' is present in the
>> > parquet file's schema but is not present in your incoming schema.
>> >
>> > From my experience, I would say it is a standard practice to add an
>> extra
>> > field which acts as a marker for soft deletion and needs to be persisted
>> > with every record. So I would suggest adding an extra field in the
>> schema
>> > and solve your use case.
>> >
>> > @Sivabalan <n.siv...@gmail.com> can probably add more here.
>> >
>> > On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <email2aak...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > We have a use case to perform soft delete over some record keys where
>> we
>> > > nullify non-key fields and ignore any update for this record later on.
>> > We
>> > > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi
>> > hard
>> > > delete (_hoodie_is_deleted) does to make it simple to identify if the
>> > > platform perform any soft delete but I am getting avro field not found
>> > > exception when we perform another soft delete on the same index,
>> please
>> > let
>> > > me know if you have any advise how to fix it or if this is a wrong
>> > > approach, we wanted to avoid adding any extra field in the customer
>> > schema
>> > > and behind the scene filter the soft delete record as done for hard
>> > delete
>> > > but still keep the record in the system.
>> > >
>> > >
>> > > Hudi : 0.8.0
>> > > Exception stacktrace:
>> > >
>> > > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
>> > > 33283, 172.25.31.77, executor 3):
>> > > org.apache.hudi.exception.HoodieUpsertException: Error upserting
>> > bucketType
>> > > UPDATE for partition :5
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> > >   at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> > >   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>> > >   at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> > >   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> > >   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>> > >   at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>> > >   at
>> > >
>> > >
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > >   at
>> > >
>> > >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > >   at java.lang.Thread.run(Thread.java:748)
>> > > Caused by: org.apache.hudi.exception.HoodieException:
>> > > org.apache.hudi.exception.HoodieException:
>> > > java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
>> > >   ... 30 more
>> > > Caused by: org.apache.hudi.exception.HoodieException:
>> > > java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
>> > >   ... 33 more
>> > > Caused by: java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> > >   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
>> > >   ... 34 more
>> > > Caused by: org.apache.hudi.exception.HoodieException: operation has
>> > failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >   ... 3 more
>> > > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
>> > > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>> > >   at
>> > >
>> >
>> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
>> > >   at
>> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >   at
>> > >
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > >   ... 4 more
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > How we add this column to the Spark dataframe :
>> > >
>> > > object SoftDeleteColInfo {
>> > >   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>> > >   val softDeleteStrVal = "true"
>> > >
>> > >   val softDeletedUDF = udf(softDeleted)
>> > >
>> > >   def softDeleted() = (arg: String) => arg
>> > > }
>> > >
>> > > sparkSession.udf.register("softDeletedUDF",
>> > > SoftDeleteColInfo.softDeletedUDF)
>> > > df.withColumn(softDeleteHudiMetaCol,
>> > > functions.callUDF("softDeletedUDF", lit("true")))
>> > >
>> >
>>
>

-- 
Regards,
-Sivabalan

Reply via email to