sbernauer opened a new issue #1845: URL: https://github.com/apache/hudi/issues/1845
Hi Hudi team, In https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-What'sHudi'sschemaevolutionstory you describe, that "As long as the schema passed to Hudi [...] is backwards compatible [...] Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date." We need and try to use this mechanism but are failing. **Steps to reproduce:** 1. We have some old schema and events. 2. We update the schema with a new, optional union field and restart the DeltaStreamer 3. We ingest new events 4. We ingest old events again (there are some upserts). In production we would have both event versions until all producers have changed to the new version. At this step the Deltastream crashes with ``` 210545 [pool-28-thread-1] ERROR org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer - Shutting down delta-sync due to exception org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 4 times, most recent failure: Lost task 0.3 in stage 22.0 (TID 1162, 100.65.150.166, executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:253) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360) at org.apache.spark.rdd.RDD.iterator(RDD.scala:311) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:302) at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:101) at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74) at org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor.handleUpdate(DeltaCommitActionExecutor.java:73) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246) ... 28 more Caused by: java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:108) at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80) at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:271) ... 32 more ``` I've created a test to reproduce this behaviour here: https://github.com/apache/hudi/pull/1844. I hope the test is helpful. The test cause the following exception, but i think they are quite related. ``` Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 23, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 19 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:66) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator.isEmpty(Iterator.scala:385) at scala.collection.Iterator.isEmpty$(Iterator.scala:385) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429) at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:44) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:801) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:491) Caused by: java.lang.ArrayIndexOutOfBoundsException: 19 ``` I think the mailing list entry from avro could help a lot: http://apache-avro.679487.n3.nabble.com/AVRO-schema-evolution-adding-optional-column-with-default-fails-deserialization-td4043025.html The corresponding place in code is here: https://github.com/apache/hudi/blob/bf1d36fa639cae558aa388d8d547e58ad2e67aba/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L107 Thanks a lot in advance! Cheers, Sebastian **Expected behavior** As stated in FAQ schema evolution should not crash the DeltaStreamer **Environment Description** * Hudi version : current master branch * Spark version : 3.0.0 * Hive version : 3.1.2 * Hadoop version : 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : yes, running on kubernetes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
