gtwuser opened a new issue, #6797: URL: https://github.com/apache/hudi/issues/6797
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** When RecordKey is set as nested field the hudi job isn't picking it up correctly: ``` org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513) ``` A clear and concise description of the problem. Dataframe created using below mentioned json body: ```bash { "version": "1.0", "metadata": { "topic": "ASA.telemetry", "contentType": "application/json", "msgID": "44162" ## this msgID is expected to be set as recordKey } } ``` commonConfig used: ```bash commonConfig = { 'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.write.precombine.field': 'payload.recordedAt', ## this is set for a nested field using dot notation 'hoodie.datasource.write.recordkey.field': 'metadata.msgID', ## this is set for a nested field using dot notation 'hoodie.table.name': 'sse', .... ...... } ``` But while saving to hudi table it throws below error : ``` org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513) ``` **To Reproduce** Steps to reproduce the behavior: 1. create a dataframe using json body this json body ```bash { "version": "1.0", "metadata": { "topic": "ASA.telemetry", "contentType": "application/json", "msgID": "44162" } } ``` 2. using these commonConfig for hudi ```bash commonConfig = { 'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.write.precombine.field': 'payload.recordedAt', ## this is set for a nested field using dot notation 'hoodie.datasource.write.recordkey.field': 'metadata.msgID', ## this is set for a nested field using dot notation 'hoodie.table.name': 'sse', # 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': args['database_name'], 'hoodie.datasource.write.reconcile.schema': 'true', 'hoodie.datasource.hive_sync.table': f'sse_{"_".join(prefix.split("/")[-7:-5])}'.lower(), 'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/merged/sse-native', # 1,024 * 1,024 * 128 = 134,217,728 (134 MB) 'hoodie.parquet.small.file.limit': '307200', 'hoodie.parquet.max.file.size': '128000000' } initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'bulk_insert'} partitionDataConfig = { 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator', 'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE, month:SIMPLE, day:SIMPLE, hour:SIMPLE, device_id:SIMPLE', 'hoodie.datasource.hive_sync.partition_fields': 'year, month, day, hour, device_id', 'hoodie.datasource.write.hive_style_partitioning': 'true' } combinedConf = {**commonConfig, ** partitionDataConfig, **initLoadConfig} ``` 3. Save this input dataframe using above confs ```bash inputDf.write \ .format('org.apache.hudi') \ .option("spark.hadoop.parquet.avro.write-old-list-structure", "false") \ .option("parquet.avro.write-old-list-structure", "false") \ .option("spark.hadoop.parquet.avro.add-list-element-records", "false") \ .option("parquet.avro.add-list-element-records", "false") \ .option("hoodie.parquet.avro.write-old-list-structure", "false") \ .option("hoodie.datasource.write.reconcile.schema", "true") \ .options(**combinedConf) \ .mode('append') \ .save() ``` **Expected behavior** A clear and concise description of what you expected to happen. It is failing while reading `recordKey` as nested value, we expect it to save the data using given recordKey successfully **Environment Description** * Hudi version : 0.11.1 * Spark version : 3.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** Add any other context about the problem here. We are running this hudi script under a glue context as a glue job using pyspark **Stacktrace** ```Add the stacktrace of the error.``` ```bash 2022-09-26 11:29:24,934 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last): File "/tmp/second-delete-upsert.py", line 297, in <module> startMerging(df_prefix_map_list) File "/tmp/second-delete-upsert.py", line 234, in startMerging .mode('append') \ File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save self._jwrite.save() File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o10765.save. : org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:586) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:178) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:184) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 324.0 failed 4 times, most recent failure: Lost task 0.3 in stage 324.0 (TID 326) (172.34.28.240 executor 1): org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487) at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140) at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50) at org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107) at org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78) at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103) at org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70) at scala.collection.Iterator$$anon$10.next(Iterator.scala:455) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357) ... 69 more Caused by: org.apache.hudi.exception.HoodieException: Cannot find a record at part value :metadata at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:513) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString(HoodieAvroUtils.java:487) at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:140) at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50) at org.apache.hudi.keygen.CustomAvroKeyGenerator.getRecordKey(CustomAvroKeyGenerator.java:107) at org.apache.hudi.keygen.CustomKeyGenerator.getRecordKey(CustomKeyGenerator.java:78) at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:103) at org.apache.hudi.HoodieDatasetBulkInsertHelper$.$anonfun$prepareForBulkInsert$2(HoodieDatasetBulkInsertHelper.scala:70) at scala.collection.Iterator$$anon$10.next(Iterator.scala:455) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:484) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
