Varun created HUDI-2172:
---------------------------

             Summary: upserts failing due to _hoodie_record_key being null in 
the hudi table
                 Key: HUDI-2172
                 URL: https://issues.apache.org/jira/browse/HUDI-2172
             Project: Apache Hudi
          Issue Type: Bug
    Affects Versions: 0.6.0
         Environment: AWS EMR emr-5.32.0 , spark version 2.4.7
            Reporter: Varun


Exception:

 
{code:java}
java.lang.NullPointerException  
at 
org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146)
  
at 
org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53)
  
at 
org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179)
  
at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
  
at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
  
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)  
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)  
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)
  
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)  
at org.apache.spark.scheduler.Task.run(Task.scala:123)  
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)  
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 
at java.lang.Thread.run(Thread.java:748)
{code}
 

 

We are using hudi as our storage engine for the output of our Spark jobs. We 
use AWS EMR to run the jobs. Recently we started observing that some of the 
upsert commits are leaving the table in an inconsistent state i.e. 
_hoodie_record_key is observed to be null for a record which is updated during 
that commit.

 

*How are we checking that __hoodie_record__key is null?*
{code:java}
val df = spark.read
            .format("org.apache.hudi")
            .load("s3://myLocation/my-table" + "/*/*/*/*")
            
df.filter($"_hoodie_record_key".isNull).show(false)

// Output
+------------------+----------------------+--------------------------+
|_hoodie_record_key|_hoodie_partition_path|primaryKey                |
+------------------+----------------------+--------------------------+
|null              |2021/07/01            |xxxxxxxxxxxxxxxxxxxxxx    |
+------------------+----------------------+--------------------------+
{code}
 

 

One thing to note here is that the record which has null for _hoodie_record_key 
was already present in the hudi table and was updated during the commit
 
 What is even weird for us is that there is only a single record in the hudi 
table with _hoodie_record_key as null, and all other records are fine
 
 We have verified that the column that is used as _hoodie_record_key 
(RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL
 
 After rolling back the faulty commit which introduced that record, rerunning 
the job works fine .i.e there are no records with __hoodie_record_key null_

_HoodieWriter Config_
__
{code:java}
val hudiOptions = Map[String, String](  
       RECORDKEY_FIELD_OPT_KEY -> "primaryKey",  
     PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey",  
     PRECOMBINE_FIELD_OPT_KEY -> "updateTime",  
     KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,  
     CLEANER_COMMITS_RETAINED_PROP -> "5"
)dataframe.write.format("org.apache.hudi")  
  .option(HoodieWriteConfig.TABLE_NAME, "myTable")  
  .options(hudiOptions)  
  .option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE")  
  .mode(SaveMode.Append)  
  .save("s3://mylocation/")
{code}
 

We are using a custom RecordPayload class which inherits from 
*OverwriteWithLatestAvroPayload*

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to