weiming created HUDI-6500:
-----------------------------
Summary: Using the nvl function in the merge into matched
condition reports an error
Key: HUDI-6500
URL: https://issues.apache.org/jira/browse/HUDI-6500
Project: Apache Hudi
Issue Type: Bug
Reporter: weiming
step1:
create table and write some data
create table test_table (
id string,
name string,
age int,
addressarray<string>,
ts long,
dt string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt);
insert overwrite test_table partition(dt = '2023-03-01') select "01",
"zhangsan01", 17, array('bj','sh'), 1688376626321;
step2:use mergeinto with nvl
merge into test_table as target
using (
select'01'as id, 'zhangsan01_new'asname , 18as age ,array('bj1','sh1','gz1')
asaddress, 1688376626322as ts,'2023-03-01'as dt
) source
on target.id = source.id and target.dt = source.dt
whenmatchedthen
update set
target.name =source.name,
target.age =if(nvl(source.age,0)<> nvl(target.age,0),source.age,target.age),
target.address =source.address,
target.ts =source.ts
when not matched then insert * ;
error log:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0
(TID 20) (bigdata-test.com executor 2):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:349)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:264)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new
record with old value in storage, for new record \{HoodieRecord{key=HoodieKey {
recordKey=01 partitionPath=dt=2023-03-01},
currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
\{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:156)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:385)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:376)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
... 28 more
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new
record with old value in storage, for new record \{HoodieRecord{key=HoodieKey {
recordKey=01 partitionPath=dt=2023-03-01},
currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
\{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:73)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
... 31 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
combine/merge new record with old value in storage, for new record
\{HoodieRecord{key=HoodieKey { recordKey=01 partitionPath=dt=2023-03-01},
currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
\{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:358)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
at
org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
... 32 more
Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression:
nvl(age#24, 0)
at
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79)
at
org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309)
at
org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308)
at
org.apache.spark.sql.catalyst.expressions.Nvl.eval(nullExpressions.scala:185)
at
org.apache.spark.sql.catalyst.expressions.ExpressionProxy.proxyEval(SubExprEvaluationRuntime.scala:132)
at
org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:46)
at
org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:44)
at
org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at
org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.sparkproject.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at
org.sparkproject.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at
org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.getEval(SubExprEvaluationRuntime.scala:53)
at
org.apache.spark.sql.catalyst.expressions.ExpressionProxy.eval(SubExprEvaluationRuntime.scala:134)
at
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:566)
at
org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:477)
at
org.apache.spark.sql.catalyst.expressions.If.eval(conditionalExpressions.scala:68)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
at
org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:117)
at
org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:130)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:120)
at
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:120)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:88)
at
org.apache.hudi.common.model.HoodieAvroRecordMerger.combineAndGetUpdateValue(HoodieAvroRecordMerger.java:84)
at
org.apache.hudi.common.model.HoodieAvroRecordMerger.merge(HoodieAvroRecordMerger.java:60)
at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:340)
... 35 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)