[
https://issues.apache.org/jira/browse/HUDI-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2172:
--------------------------------------
Labels: core-flow-ds sev:high user-support-issues (was:
user-support-issues)
> upserts failing due to _hoodie_record_key being null in the hudi table
> ----------------------------------------------------------------------
>
> Key: HUDI-2172
> URL: https://issues.apache.org/jira/browse/HUDI-2172
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.6.0
> Environment: AWS EMR emr-5.32.0 , spark version 2.4.7
> Reporter: Varun
> Priority: Major
> Labels: core-flow-ds, sev:high, user-support-issues
>
> Exception:
>
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146)
>
> at
> org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53)
>
> at
> org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179)
>
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
>
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
>
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
>
> We are using hudi as our storage engine for the output of our Spark jobs. We
> use AWS EMR to run the jobs. Recently we started observing that some of the
> upsert commits are leaving the table in an inconsistent state i.e.
> _hoodie_record_key is observed to be null for a record which is updated
> during that commit.
>
> *How are we checking that __hoodie_record__key is null?*
> {code:java}
> val df = spark.read
> .format("org.apache.hudi")
> .load("s3://myLocation/my-table" + "/*/*/*/*")
>
> df.filter($"_hoodie_record_key".isNull).show(false)
> // Output
> +------------------+----------------------+--------------------------+
> |_hoodie_record_key|_hoodie_partition_path|primaryKey |
> +------------------+----------------------+--------------------------+
> |null |2021/07/01 |xxxxxxxxxxxxxxxxxxxxxx |
> +------------------+----------------------+--------------------------+
> {code}
>
>
> One thing to note here is that the record which has null for
> _hoodie_record_key was already present in the hudi table and was updated
> during the commit
>
> What is even weird for us is that there is only a single record in the hudi
> table with _hoodie_record_key as null, and all other records are fine
>
> We have verified that the column that is used as _hoodie_record_key
> (RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL
>
> After rolling back the faulty commit which introduced that record, rerunning
> the job works fine .i.e there are no records with __hoodie_record_key null_
> _HoodieWriter Config_
> __
> {code:java}
> val hudiOptions = Map[String, String](
> RECORDKEY_FIELD_OPT_KEY -> "primaryKey",
> PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey",
> PRECOMBINE_FIELD_OPT_KEY -> "updateTime",
> KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
> CLEANER_COMMITS_RETAINED_PROP -> "5"
> )dataframe.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, "myTable")
> .options(hudiOptions)
> .option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE")
> .mode(SaveMode.Append)
> .save("s3://mylocation/")
> {code}
>
> We are using a custom RecordPayload class which inherits from
> *OverwriteWithLatestAvroPayload*
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)