rita-ihnatsyeva opened a new issue, #9838: URL: https://github.com/apache/hudi/issues/9838
- Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? yes **Describe the problem you faced** Prerequisites: 1. I have a permanent hudi table in AWS Glue 2. This hudi table is updated with temporary tables created as df.createOrReplaceGlobalTempView() 3. I have complex data structures in both permanent hudi table and arriving temporary tables 4. Hudi table is updated by "MERGE INTO .." sql statement While executing merge statement on Hudi with Glue, following error (not faced before, for the same approach) occurred. When using MERGE statement of following type The statement ``` filter(array_union(coalesce(target.<array_of_structs>, array()), source.<array_of_structs>), x -> ! array_contains(source.<array_of_deleted_ids>, x.id)) ``` parses and executes perfectly well with Spark SQL so there should be no parsing error here - <array_of_structs> column has following schema |-- <array_of_structs>: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- id: string (nullable = true) | | |-- created_at: string (nullable = true) - <array_of_deleted_ids> column has following schema |-- <array_of_deleted_ids> : array (nullable = false) | |-- element: sting (containsNull = false) ``` """ merge into {target_table_hudi} target using global_temp.{source_table_view} source on target.uuid = source.uuid when matched then update set target.partition_time = source.partition_time,, target.<array_of_structs> = filter(array_union(coalesce(target.<array_of_structs>, array()), source.<array_of_structs>), x -> ! array_contains(source.<array_of_deleted_ids>, x.id)) when not matched then insert * """ ``` Fails with error: py4j.protocol.Py4JJavaError: An error occurred while calling o95.sql. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 206.0 failed 4 times, most recent failure: Lost task 0.3 in stage 206.0 (TID 2674) (172.35.180.173 executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 .... Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record I've also noticed that zip_with also fails with the same generic error Other array functions like: array_max(), array_union(), array_except() ... work as expected in merge statement **To Reproduce** Steps to reproduce the behavior: Hudi setup for permanent Hudi table ``` "className": "org.apache.hudi", "hoodie.table.name": "default", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "uuid", "hoodie.datasource.write.precombine.field": "partition_time", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.table": "default", "hoodie.datasource.hive_sync.conditional_sync": "true", "hoodie.datasource.write.reconcile.schema": "true" ``` 1. Run Glue 3 in AWS 2. Create Hudi table with following parameters and nested array fields as explained in the problem 3. Create temporary table with the same structure 4. Update Hudi permanent table with "MERGE INTO .." spark sql statement as described above **Expected behavior** Merge occurring without any problems, moreover this peace of code seemed to work not so long ago, no idea why it fails now( **Environment Description** * Hudi version : 0.10.1 * Spark version : 3.1 * Storage (HDFS/S3/GCS..) : S3 * Glue version: 3 **Stacktrace** > Stacktrace is modified a bit not to show NDA info ```Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=uuid:<some_uuid> partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20231009135924173, fileId=0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0}', newLocation='HoodieRecordLocation {instantTime=20231009140002017, fileId=0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0}'}}, old value {{"_hoodie_commit_time": "20231009135849616", "_hoodie_commit_seqno": "20231009135849616_0_9", "_hoodie_record_key": "uuid:<some_uuid>", "_hoodie_partition_path": "", "_hoodie_file_name": "0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0_0-53-886_20231009135849616.parquet", "uuid": "<some_uuid>", "<array_of_structs>":[{"id":"<some_id>", "created_date":"<some_date>"}], "array_of_deleted_ids": ["<some_id1>", "<some_id2>"], "partition_time": "2023-10-09 13:58:49.972", "Op": "MODIFY"}} at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:344) at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122) at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: org.apache.hudi.spark.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.NullPointerException at org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.hudi.spark.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.hudi.spark.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload$.getEvaluator(ExpressionPayload.scala:303) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:106) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:81) at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:324) ... 8 more Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1(javaCode.scala:240) at org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1$adapted(javaCode.scala:236) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37) at org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.code$extension(javaCode.scala:236) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode(CodegenFallback.scala:62) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode$(CodegenFallback.scala:28) at org.apache.spark.sql.catalyst.expressions.ZipWith.doGenCode(higherOrderFunctions.scala:1052) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141) at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163) at org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen$.$anonfun$doCodeGen$2(ExpressionCodeGen.scala:62) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.immutable.List.foreach(List.scala:388) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.immutable.List.map(List.scala:294) at org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen$.doCodeGen(ExpressionCodeGen.scala:62) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.$anonfun$call$1(ExpressionPayload.scala:319) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.immutable.Map$Map1.foreach(Map.scala:125) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.call(ExpressionPayload.scala:310) at org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.call(ExpressionPayload.scala:303) at org.apache.hudi.spark.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.hudi.spark.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 14 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278) at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.take(RDD.scala:1422) at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:627) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1440) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
