Varun created HUDI-2172:
---------------------------
Summary: upserts failing due to _hoodie_record_key being null in
the hudi table
Key: HUDI-2172
URL: https://issues.apache.org/jira/browse/HUDI-2172
Project: Apache Hudi
Issue Type: Bug
Affects Versions: 0.6.0
Environment: AWS EMR emr-5.32.0 , spark version 2.4.7
Reporter: Varun
Exception:
{code:java}
java.lang.NullPointerException
at
org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146)
at
org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53)
at
org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
We are using hudi as our storage engine for the output of our Spark jobs. We
use AWS EMR to run the jobs. Recently we started observing that some of the
upsert commits are leaving the table in an inconsistent state i.e.
_hoodie_record_key is observed to be null for a record which is updated during
that commit.
*How are we checking that __hoodie_record__key is null?*
{code:java}
val df = spark.read
.format("org.apache.hudi")
.load("s3://myLocation/my-table" + "/*/*/*/*")
df.filter($"_hoodie_record_key".isNull).show(false)
// Output
+------------------+----------------------+--------------------------+
|_hoodie_record_key|_hoodie_partition_path|primaryKey |
+------------------+----------------------+--------------------------+
|null |2021/07/01 |xxxxxxxxxxxxxxxxxxxxxx |
+------------------+----------------------+--------------------------+
{code}
One thing to note here is that the record which has null for _hoodie_record_key
was already present in the hudi table and was updated during the commit
What is even weird for us is that there is only a single record in the hudi
table with _hoodie_record_key as null, and all other records are fine
We have verified that the column that is used as _hoodie_record_key
(RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL
After rolling back the faulty commit which introduced that record, rerunning
the job works fine .i.e there are no records with __hoodie_record_key null_
_HoodieWriter Config_
__
{code:java}
val hudiOptions = Map[String, String](
RECORDKEY_FIELD_OPT_KEY -> "primaryKey",
PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey",
PRECOMBINE_FIELD_OPT_KEY -> "updateTime",
KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
CLEANER_COMMITS_RETAINED_PROP -> "5"
)dataframe.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, "myTable")
.options(hudiOptions)
.option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE")
.mode(SaveMode.Append)
.save("s3://mylocation/")
{code}
We are using a custom RecordPayload class which inherits from
*OverwriteWithLatestAvroPayload*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)