silly-carbon opened a new issue, #9555:
URL: https://github.com/apache/hudi/issues/9555
**Describe the problem you faced**
We have two tables, simplified as below:
```
create table temp_db.merge_target (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
primaryKey = 'id'
);
```
```
insert into temp_db.merge_target select 1, 'old_a1', 20, 1000;
```
```
create table temp_db.merge_source_no_id
(name string, price double, ts bigint)
using orc;
```
```
insert into temp_db.merge_source_no_id select 'old_a1', 20.5, 1001;
```
When using Hudi MERGE INTO statement like this:
```
merge into temp_db.merge_target as target
using temp_db.merge_source_no_id as source
on target.name = source.name
when matched then update set target.price = source.price;
```
It throws an exception, which says the id column is null (which is not, as
you can see from the above SQL statements, we have a single row with id==1):
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 88.0 failed 4 times, most recent failure: Lost task 0.3 in stage 88.0
(TID 1537, gsc-bigdataprod3.synnex.org, executor 1):
org.apache.hudi.exception.HoodieKeyException: recordKey values: "id:__null__"
for fields: [id] cannot be entirely null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
at
org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
at
org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
at
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
at
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$9(HoodieSparkSqlWriter.scala:251)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
I've checked the hudi source code and it seems that: *For MERGE to work
successfully, the primary keys of the target table must also exist in the
source table, they may not also be primary keys of the source table, but they
at least must exist(e.g., temp_db.merge_source_no_id must have id field).*
Proof:
Step 1: create a new table from the source, but adding one id column fetched
from the target:
```
create table temp_db.merge_source_with_id using orc
as
select target.id, source.name, source.price, source.ts from
temp_db.merge_target as target
join temp_db.merge_source_no_id as source
on target.name = source.name;
```
Step 2: Replace the above MERGE INTO stmt, with just one diff: change the
source table name to merge_source_with_id (so this new source table now has an
extra id column):
```
merge into temp_db.merge_target as target
using temp_db.merge_source_with_id as source
on target.name = source.name
when matched then update set target.price = source.price;
```
This runs successfully, result (price gets updated to 20.5):

**To Reproduce**
Steps to reproduce the behavior: view the above SQLs
**Expected behavior**
*Target table should be updated successfully.*
Hudi cannot re-use the `id` column from the matching rows of the target
table, but this shall somehow be a correct thing to do. We should still be able
to update the target table using columns from the source table, because we are
still updating the rows of the target, and those rows already have `id`s which
are non-null values.
**Environment Description**
* Hudi version : 0.10.1
* Spark version : 3.0.2
* Hive version : 2.3.7
* Hadoop version : 3.1.0.0
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
time 20230828080701752
at
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:63)
at
org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor.execute(SparkInsertCommitActionExecutor.java:46)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insert(HoodieSparkCopyOnWriteTable.java:124)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insert(HoodieSparkCopyOnWriteTable.java:103)
at
org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:183)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:773)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:614)
at
com.synnex.bigdata.spark.sql.metrics.SparkQueryMetricsListener.onSqlBegin(SparkQueryMetricsListener.scala:135)
at
com.synnex.bigdata.spark.sql.metrics.SparkQueryMetricsListener.onSqlBegin$(SparkQueryMetricsListener.scala:114)
at
com.synnex.bigdata.spark.sql.listener.BigDataSparkListener$$anon$1.onSqlBegin(BigDataSparkListener.scala:46)
at
com.synnex.bigdata.spark.sql.listener.BigDataSparkListener$.$anonfun$init$2(BigDataSparkListener.scala:52)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:614)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:773)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
... 52 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 88.0 failed 4 times, most recent failure: Lost task
0.3 in stage 88.0 (TID 1537, gsc-bigdataprod3.synnex.org, executor 1):
org.apache.hudi.exception.HoodieKeyException: recordKey values: "id:__null__"
for fields: [id] cannot be entirely null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
at
org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
at
org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
at
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
at
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$9(HoodieSparkSqlWriter.scala:251)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2125)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2165)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2190)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
at
org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at
org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:313)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:193)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:160)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:82)
at
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:56)
... 80 more
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey values:
"id:__null__" for fields: [id] cannot be entirely null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
at
org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
at
org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.spark.sql.hudi.command.SqlKeyGenerator.getRecordKey(SqlKeyGenerator.scala:64)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
at
org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$9(HoodieSparkSqlWriter.scala:251)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
... 3 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]