[ 
https://issues.apache.org/jira/browse/HUDI-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2172:
--------------------------------------
    Labels: core-flow-ds sev:high user-support-issues  (was: 
user-support-issues)

> upserts failing due to _hoodie_record_key being null in the hudi table
> ----------------------------------------------------------------------
>
>                 Key: HUDI-2172
>                 URL: https://issues.apache.org/jira/browse/HUDI-2172
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.6.0
>         Environment: AWS EMR emr-5.32.0 , spark version 2.4.7
>            Reporter: Varun
>            Priority: Major
>              Labels: core-flow-ds, sev:high, user-support-issues
>
> Exception:
>  
> {code:java}
> java.lang.NullPointerException  
> at 
> org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146)
>   
> at 
> org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53)
>   
> at 
> org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179)
>   
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
>   
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
>   
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)  
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)  
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)
>   
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
>  
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 
>  
> at org.apache.spark.scheduler.Task.run(Task.scala:123)  
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)  
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  
> We are using hudi as our storage engine for the output of our Spark jobs. We 
> use AWS EMR to run the jobs. Recently we started observing that some of the 
> upsert commits are leaving the table in an inconsistent state i.e. 
> _hoodie_record_key is observed to be null for a record which is updated 
> during that commit.
>  
> *How are we checking that __hoodie_record__key is null?*
> {code:java}
> val df = spark.read
>             .format("org.apache.hudi")
>             .load("s3://myLocation/my-table" + "/*/*/*/*")
>             
> df.filter($"_hoodie_record_key".isNull).show(false)
> // Output
> +------------------+----------------------+--------------------------+
> |_hoodie_record_key|_hoodie_partition_path|primaryKey                |
> +------------------+----------------------+--------------------------+
> |null              |2021/07/01            |xxxxxxxxxxxxxxxxxxxxxx    |
> +------------------+----------------------+--------------------------+
> {code}
>  
>  
> One thing to note here is that the record which has null for 
> _hoodie_record_key was already present in the hudi table and was updated 
> during the commit
>  
>  What is even weird for us is that there is only a single record in the hudi 
> table with _hoodie_record_key as null, and all other records are fine
>  
>  We have verified that the column that is used as _hoodie_record_key 
> (RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL
>  
>  After rolling back the faulty commit which introduced that record, rerunning 
> the job works fine .i.e there are no records with __hoodie_record_key null_
> _HoodieWriter Config_
> __
> {code:java}
> val hudiOptions = Map[String, String](  
>        RECORDKEY_FIELD_OPT_KEY -> "primaryKey",  
>      PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey",  
>      PRECOMBINE_FIELD_OPT_KEY -> "updateTime",  
>      KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,  
>      CLEANER_COMMITS_RETAINED_PROP -> "5"
> )dataframe.write.format("org.apache.hudi")  
>   .option(HoodieWriteConfig.TABLE_NAME, "myTable")  
>   .options(hudiOptions)  
>   .option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE")  
>   .mode(SaveMode.Append)  
>   .save("s3://mylocation/")
> {code}
>  
> We are using a custom RecordPayload class which inherits from 
> *OverwriteWithLatestAvroPayload*
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to