santoshsb commented on issue #5452: URL: https://github.com/apache/hudi/issues/5452#issuecomment-1112864560
Hi @xiarixiaoyao, thanks for the code. It worked like a charm for the reduced json as provided above. After successfully testing it with the reduced schema, we used the complete schema (https://www.hl7.org/fhir/patient.html#resource). Even though the source and target schema are matching the following error is thrown while updating a record (both the schemas are provided below for reference), `Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.count(RDD.scala:1253) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:646) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:314) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) ... 67 elided Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:160) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:154) ... 32 more Caused by: org.apache.hudi.exception.HoodieException: operation has failed at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:134) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'coding' not found at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228) at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539) at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489) at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137) at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91) at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185) at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)` Current table schema is as follows, `scala> patienthudi.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- city: string (nullable = true) | | |-- country: string (nullable = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- extension: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- valueDecimal: double (nullable = true) | | | | |-- url: string (nullable = true) | | |-- line: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- postalCode: string (nullable = true) | | |-- state: string (nullable = true) |-- birthDate: string (nullable = true) |-- communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) |-- extension: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- url: string (nullable = true) | | | | |-- valueCoding: struct (nullable = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | | |-- valueString: string (nullable = true) | | |-- url: string (nullable = true) | | |-- valueAddress: struct (nullable = true) | | | |-- city: string (nullable = true) | | | |-- country: string (nullable = true) | | | |-- state: string (nullable = true) | | |-- valueCode: string (nullable = true) | | |-- valueDecimal: double (nullable = true) | | |-- valueString: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- identifier: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- type: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) | | |-- value: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- managingOrganization: struct (nullable = true) | |-- reference: string (nullable = true) | |-- type: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- meta: struct (nullable = true) | |-- extension: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- url: string (nullable = true) | | | |-- valueString: string (nullable = true) | |-- lastUpdated: string (nullable = true) | |-- source: string (nullable = true) | |-- versionId: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- multipleBirthInteger: long (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- prefix: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) |-- telecom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- use: string (nullable = true) | | |-- value: string (nullable = true) |-- text: struct (nullable = true) | |-- div: string (nullable = true) | |-- status: string (nullable = true)` Incoming/Update dataframe schema is as follows after using the code provided by you `scala> updatedStringDf.printSchema root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- city: string (nullable = true) | | |-- country: string (nullable = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- extension: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- valueDecimal: double (nullable = true) | | | | |-- url: string (nullable = true) | | |-- line: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- postalCode: string (nullable = true) | | |-- state: string (nullable = true) |-- birthDate: string (nullable = true) |-- communication: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- language: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) |-- extension: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- extension: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- url: string (nullable = true) | | | | |-- valueCoding: struct (nullable = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | | |-- valueString: string (nullable = true) | | |-- url: string (nullable = true) | | |-- valueAddress: struct (nullable = true) | | | |-- city: string (nullable = true) | | | |-- country: string (nullable = true) | | | |-- state: string (nullable = true) | | |-- valueCode: string (nullable = true) | | |-- valueDecimal: double (nullable = true) | | |-- valueString: string (nullable = true) |-- gender: string (nullable = true) |-- id: string (nullable = true) |-- identifier: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- type: struct (nullable = true) | | | |-- coding: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- code: string (nullable = true) | | | | | |-- display: string (nullable = true) | | | | | |-- system: string (nullable = true) | | | |-- text: string (nullable = true) | | |-- value: string (nullable = true) |-- lastUpdated: string (nullable = true) |-- managingOrganization: struct (nullable = true) | |-- reference: string (nullable = true) | |-- type: string (nullable = true) |-- maritalStatus: struct (nullable = true) | |-- coding: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- code: string (nullable = true) | | | |-- display: string (nullable = true) | | | |-- system: string (nullable = true) | |-- text: string (nullable = true) |-- meta: struct (nullable = true) | |-- extension: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- url: string (nullable = true) | | | |-- valueString: string (nullable = true) | |-- lastUpdated: string (nullable = true) | |-- source: string (nullable = true) | |-- versionId: string (nullable = true) |-- multipleBirthBoolean: boolean (nullable = true) |-- multipleBirthInteger: long (nullable = true) |-- name: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- family: string (nullable = true) | | |-- given: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- prefix: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- use: string (nullable = true) |-- resourceType: string (nullable = true) |-- source: string (nullable = true) |-- telecom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- system: string (nullable = true) | | |-- use: string (nullable = true) | | |-- value: string (nullable = true) |-- text: struct (nullable = true) | |-- div: string (nullable = true) | |-- status: string (nullable = true)` We have seen this issue in the troubleshooting guide but thats when there is a schema, here both the schema are identical. Any pointers will be helpfull. Thanks, Santosh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
