TarunMootala opened a new issue #4914: URL: https://github.com/apache/hudi/issues/4914
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** My use case is to populate default values for the missing fields. Using the property hoodie.datasource.write.reconcile.schema to inject default values for the missing fields but it's failing with error "org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1" **To Reproduce** Steps to reproduce the behavior: 1. Create a Hudi COW table and insert some sample data. ``` inputDF = spark.createDataFrame( [ ("100", "AAA", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "BBB", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "CCC", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "DDD", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "EEE", "2015-01-01", "2015-01-01T12:15:00.512679Z"), ("105", "FFF", "2015-01-01", "2015-01-01T13:51:42.248818Z") ], ["id", "name", "creation_date", "last_update_time"] ) table_name = "first_hudi_table" table_path = f"s3://<bucket_name>/Hudi/{table_name}" hudiOptions = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':'streaming_dev', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor' } print(table_name, table_path) inputDF.write\ .format('hudi')\ .option('hoodie.datasource.write.operation', 'insert')\ .options(**hudiOptions)\ .mode('overwrite')\ .save(table_path) ``` 2. Create a spark dataframe with less number of fields when compared to the schema of table. All the mandatory fields like Recordkey, precombine, partitionpath fields should present in the dataframe. 3. Enable the property hoodie.datasource.write.reconcile.schema, and upsert the Spark dataframe into Hudi table. ``` inputDF = spark.createDataFrame( [ ("110", '2015-01-01', "2015-01-02T13:51:39.340396Z"), ], ["id", "creation_date", "last_update_time"] ) hudiOptions = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.write.reconcile.schema': 'true', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':'streaming_dev', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor' } inputDF.write\ .format('hudi')\ .option('hoodie.datasource.write.operation', 'upsert')\ .options(**hudiOptions)\ .mode('append')\ .save(table_path) ``` **Expected behavior** Expecting upsert should succeed and default values injected for missing fields. **Environment Description** Using Jupyter notebook with AWS EMR 6.5.0 and Glue Data Catalog as Hive metastore. * Hudi version : 0.9 * Spark version : 3.1.2 * Hive version : 3.1.2 * Hadoop version : 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Stacktrace** ``` An error was encountered: An error occurred while calling o205.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220225180756 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:88) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 12.0 failed 4 times, most recent failure: Lost task 3.3 in stage 12.0 (TID 34) (ip-172-31-0-85.ec2.internal executor 4): java.io.IOException: Could not create payload for class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:133) at org.apache.hudi.DataSourceUtils.createHoodieRecord(DataSourceUtils.java:236) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$7(HoodieSparkSqlWriter.scala:237) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.common.model.OverwriteWithLatestAvroPayload at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91) at org.apache.hudi.DataSourceUtils.createPayload(DataSourceUtils.java:130) ... 16 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89) ... 17 more Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1 at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) at org.apache.hudi.avro.HoodieAvroUtils.indexedRecordToBytes(HoodieAvroUtils.java:102) at org.apache.hudi.avro.HoodieAvroUtils.avroToBytes(HoodieAvroUtils.java:94) at org.apache.hudi.common.model.BaseAvroPayload.<init>(BaseAvroPayload.java:49) at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.<init>(OverwriteWithLatestAvroPayload.java:42) ... 22 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2262) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2281) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2306) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:114) at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84) at org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60) at org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69) at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51) ... 45 more Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save self._jwrite.save(path) File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
