rita-ihnatsyeva opened a new issue, #9838:
URL: https://github.com/apache/hudi/issues/9838

   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   yes
   
   **Describe the problem you faced**
   
   Prerequisites:
   1. I have a permanent hudi table in AWS Glue
   2. This hudi table is updated with temporary tables created as 
df.createOrReplaceGlobalTempView()
   3. I have complex data structures in both permanent hudi table and arriving 
temporary tables
   4. Hudi table is updated by "MERGE INTO .." sql statement
   
   While executing merge statement on Hudi with Glue, following error (not 
faced before, for the same approach) occurred.
   When using MERGE statement of following type 
   
   The statement
   ```
    filter(array_union(coalesce(target.<array_of_structs>, array()),
                       source.<array_of_structs>), x ->  ! 
array_contains(source.<array_of_deleted_ids>, x.id))
   ```
   parses and executes perfectly well with Spark SQL so there should be no 
parsing error here
   
   - <array_of_structs> column has following schema
    |-- <array_of_structs>: array (nullable = false)
    |    |-- element: struct (containsNull = false)
    |    |    |-- id: string (nullable = true)
    |    |    |-- created_at: string (nullable = true)
   
   - <array_of_deleted_ids> column has following schema
    |-- <array_of_deleted_ids> : array (nullable = false)
    |    |-- element: sting (containsNull = false)
   ```
   
   """ merge into {target_table_hudi} target
                       using global_temp.{source_table_view} source
                       on target.uuid = source.uuid
                       when matched then update set
                       target.partition_time = source.partition_time,,
                       target.<array_of_structs> = 
filter(array_union(coalesce(target.<array_of_structs>, array()),
                       source.<array_of_structs>), x ->  ! 
array_contains(source.<array_of_deleted_ids>, x.id))
                       when not matched then insert *
   """
   ```
   Fails with error:
   py4j.protocol.Py4JJavaError: An error occurred while calling o95.sql.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 206.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
206.0 (TID 2674) (172.35.180.173 executor 1): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :0
   ....
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new 
record with old value in storage, for new record 
   
   I've also noticed that zip_with also fails with the same generic error
   
   Other array functions like: array_max(), array_union(), array_except() ... 
work as expected in merge statement
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   Hudi setup for permanent Hudi table
   
   ```
       "className": "org.apache.hudi",
       "hoodie.table.name": "default",
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.precombine.field": "partition_time",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": "default",
       "hoodie.datasource.hive_sync.conditional_sync": "true",
       "hoodie.datasource.write.reconcile.schema": "true"
   ```
   
   1. Run Glue 3 in AWS
   2. Create Hudi table with following parameters and nested array fields as 
explained in the problem
   3. Create temporary table with the same structure 
   4. Update Hudi permanent table with "MERGE INTO .." spark sql statement as 
described above  
   
   **Expected behavior**
   
   Merge occurring without any problems, moreover this peace of code seemed to 
work not so long ago, no idea why it fails now(
   
   **Environment Description**
   
   * Hudi version : 0.10.1
   
   * Spark version :  3.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   *  Glue version: 3
   
   
   **Stacktrace**
   
   > 
   Stacktrace is modified a bit not to show NDA info
   
   ```Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to 
combine/merge new record with old value in storage, for new record 
{HoodieRecord{key=HoodieKey { recordKey=uuid:<some_uuid> partitionPath=}, 
currentLocation='HoodieRecordLocation {instantTime=20231009135924173, 
fileId=0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0}', 
newLocation='HoodieRecordLocation {instantTime=20231009140002017, 
fileId=0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0}'}}, old value 
{{"_hoodie_commit_time": "20231009135849616", "_hoodie_commit_seqno": 
"20231009135849616_0_9", "_hoodie_record_key": "uuid:<some_uuid>", 
"_hoodie_partition_path": "", "_hoodie_file_name": 
"0bf6d2a2-9f67-44a9-b3d4-08890c3d813a-0_0-53-886_20231009135849616.parquet", 
"uuid": "<some_uuid>", "<array_of_structs>":[{"id":"<some_id>", 
"created_date":"<some_date>"}], "array_of_deleted_ids": ["<some_id1>", 
"<some_id2>"], "partition_time": "2023-10-09 13:58:49.972", "Op": "MODIFY"}}
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:344)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
   Caused by: 
org.apache.hudi.spark.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.NullPointerException
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload$.getEvaluator(ExpressionPayload.scala:303)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:106)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:81)
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:324)
        ... 8 more
   Caused by: java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1(javaCode.scala:240)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.$anonfun$code$1$adapted(javaCode.scala:236)
        at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
        at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.Block$BlockHelper$.code$extension(javaCode.scala:236)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode(CodegenFallback.scala:62)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback.doGenCode$(CodegenFallback.scala:28)
        at 
org.apache.spark.sql.catalyst.expressions.ZipWith.doGenCode(higherOrderFunctions.scala:1052)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
        at 
org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:163)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen$.$anonfun$doCodeGen$2(ExpressionCodeGen.scala:62)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.immutable.List.foreach(List.scala:388)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.immutable.List.map(List.scala:294)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen$.doCodeGen(ExpressionCodeGen.scala:62)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.$anonfun$call$1(ExpressionPayload.scala:319)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.call(ExpressionPayload.scala:310)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload$$anon$1.call(ExpressionPayload.scala:303)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
        at 
org.apache.hudi.spark.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
        ... 14 more
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
        at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
        at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:627)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
        at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
        at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :0
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1440)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to