BalaMahesh opened a new issue, #7219: URL: https://github.com/apache/hudi/issues/7219
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? yes. - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** We have Hudi running stable for a while on a PostgresDebeziumSource. We have schema update to the table and debezium has updated the latest schema in Confluent Schema Registry. We have configured Hudi to use the latest schema version from schema registry. Now, there are some records in Kafka with old schema and while processing these records, Hudi is failing with error. A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1. Use Debezium to publish data to Kafka and configure Hudi to use PostgresDebeziumSource, PostgresDebeziumAvroPayload and Confluent schema registry , schema url pointing to latest version of schema. 2. Stop Hudi(make sure there are unconsumed records with old schema present in Kafka) and update(add new columns) the schema of Postgres table. 3. Now start Hudi again to process the unconsumed records(with old schema) and fetch latest schema from schema registry. 4. This will cause pipeline failure while converting input records to DataSet<Row>. **Expected behavior** A clear and concise description of what you expected to happen. We expect Hudi to set default value for the the fields which are not present in the old schema or process them as it is and store in filestystem(table). **Environment Description** * Hudi version : 0.12.0 * Spark version : 3.2.2 * Hive version : 2.3.5 * Hadoop version : 2.7.7 * Storage (HDFS/S3/GCS..) : GCS * Running on Docker? (yes/no) : no **Additional context** Add any other context about the problem here. `private Dataset<Row> toDataset(OffsetRange[] offsetRanges, KafkaOffsetGen offsetGen, String schemaStr) { AvroConvertor convertor = new AvroConvertor(schemaStr); Dataset<Row> kafkaData; if (deserializerClassName.equals(StringDeserializer.class.getName())) { kafkaData = AvroConversionUtils.createDataFrame( KafkaUtils.<String, String>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) .map(obj -> convertor.fromJson(obj.value())) .rdd(), schemaStr, sparkSession); } else { **kafkaData = AvroConversionUtils.createDataFrame(** KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) .map(obj -> (GenericRecord) obj.value()) .rdd(), schemaStr, sparkSession); } // Flatten debezium payload, specific to each DB type (postgres/ mysql/ etc..) Dataset<Row> debeziumDataset = processDataset(kafkaData); // Some required transformations to ensure debezium data types are converted to spark supported types. return convertArrayColumnsToString(convertColumnToNullable(sparkSession, convertDateColumns(debeziumDataset, new Schema.Parser().parse(schemaStr)))); }` the above piece of code is creating data frame with the latest schema fetched from schema registry , ideally it should also modify RDD to set the default values for fields which are present in latest schema and not present in the old data fetched from Kafka. Finally pipeline is failing at conversion. ` @Deprecated def createConverterToRow(sourceAvroSchema: Schema, targetSqlType: StructType): GenericRecord => Row = { val encoder = RowEncoder.apply(targetSqlType).resolveAndBind() val serde = sparkAdapter.createSparkRowSerDe(encoder) val converter = AvroConversionUtils.createAvroToInternalRowConverter(sourceAvroSchema, targetSqlType) avro => converter.apply(avro).map(serde.deserializeRow).get }` with the below stack trace. my old version of data has only 72 fields and new field is added in the latest schema version. But Hudi is trying to access the field from RDD and getting the error. **Stacktrace** ```22/11/16 13:04:49 WARN task-result-getter-1 TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (192.168.0.188 executor driver): java.lang.ArrayIndexOutOfBoundsException: 73 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:383) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:379) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:212) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:210) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:365) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:361) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:383) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:379) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106) at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:30) at org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:68) at org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:97) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:132) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
