[
https://issues.apache.org/jira/browse/HUDI-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen closed HUDI-6500.
----------------------------
Resolution: Fixed
Fixed via master branch: cb8ff26792139c64aa3fb1db088798f0d2adf7e7
> Using the RuntimeReplaceable function in the merge into matched condition
> reports an error
> ------------------------------------------------------------------------------------------
>
> Key: HUDI-6500
> URL: https://issues.apache.org/jira/browse/HUDI-6500
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: weiming
> Assignee: weiming
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.14.0
>
> Attachments: image-2023-07-12-17-51-55-790.png
>
>
> step1:
> create table and write some data
> create table test_table (
> id string,
> name string,
> age int,
> addressarray<string>,
> ts long,
> dt string
> ) using hudi
> tblproperties (
> type = 'cow',
> primaryKey = 'id',
> preCombineField = 'ts'
> )
> partitioned by (dt);
> insert overwrite test_table partition(dt = '2023-03-01') select "01",
> "zhangsan01", 17, array('bj','sh'), 1688376626321;
> step2: Use some RuntimeReplaceable function like nvl etc.
> merge into test_table as target
> using (
> select'01'as id, 'zhangsan01_new'asname , 18as age ,array('bj1','sh1','gz1')
> asaddress, 1688376626322as ts,'2023-03-01'as dt
> ) source
> on target.id = source.id and target.dt = source.dt
> whenmatchedthen
> update set
> target.name =source.name,
> target.age =if(nvl(source.age,0)<> nvl(target.age,0),source.age,target.age),
> target.address =source.address,
> target.ts =source.ts
> when not matched then insert * ;
>
> !image-2023-07-12-17-51-55-790.png!
>
> error log:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 26.0 failed 4 times, most recent failure: Lost task 0.3 in stage 26.0
> (TID 20) (bigdata-test.com executor 2):
> org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
> UPDATE for partition :0
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:349)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:264)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:382)
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
> at
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1463)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new
> record with old value in storage, for new record {HoodieRecord{key=HoodieKey
> { recordKey=01 partitionPath=dt=2023-03-01},
> currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
> at
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:156)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:385)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:376)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
> ... 28 more
> Caused by: org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new
> record with old value in storage, for new record \{HoodieRecord{key=HoodieKey
> { recordKey=01 partitionPath=dt=2023-03-01}
> , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
> at
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:73)
> at
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
> ... 31 more
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to
> combine/merge new record with old value in storage, for new record
> {HoodieRecord{key=HoodieKey
> { recordKey=01 partitionPath=dt=2023-03-01}
> , currentLocation='HoodieRecordLocation \{instantTime=20230705190339617,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}',
> newLocation='HoodieRecordLocation \{instantTime=20230705191354955,
> fileId=56fda6a9-85e2-4f5b-b49a-1b4e924c5183-0}'}}, old value
> \{HoodieRecord{key=null, currentLocation='null', newLocation='null'}}
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:358)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
> at
> org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
> at
> org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
> ... 32 more
> {color:#FF0000}*Caused by: java.lang.UnsupportedOperationException: Cannot
> evaluate expression: nvl(age#24, 0)*{color}
> at
> org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:79)
> at
> org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:309)
> at
> org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:308)
> at
> org.apache.spark.sql.catalyst.expressions.Nvl.eval(nullExpressions.scala:185)
> at
> org.apache.spark.sql.catalyst.expressions.ExpressionProxy.proxyEval(SubExprEvaluationRuntime.scala:132)
> at
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:46)
> at
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime$$anon$1.load(SubExprEvaluationRuntime.scala:44)
> at
> org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> at
> org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
> at org.sparkproject.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> at
> org.sparkproject.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> at
> org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.getEval(SubExprEvaluationRuntime.scala:53)
> at
> org.apache.spark.sql.catalyst.expressions.ExpressionProxy.eval(SubExprEvaluationRuntime.scala:134)
> at
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:566)
> at
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:477)
> at
> org.apache.spark.sql.catalyst.expressions.If.eval(conditionalExpressions.scala:68)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:117)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:130)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:120)
> at
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:120)
> at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:88)
> at
> org.apache.hudi.common.model.HoodieAvroRecordMerger.combineAndGetUpdateValue(HoodieAvroRecordMerger.java:84)
> at
> org.apache.hudi.common.model.HoodieAvroRecordMerger.merge(HoodieAvroRecordMerger.java:60)
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:340)
> ... 35 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)