BalaMahesh opened a new issue, #7219:
URL: https://github.com/apache/hudi/issues/7219

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   yes.
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   We have Hudi running stable for a while on a PostgresDebeziumSource. We have 
schema update to the table and debezium has updated the latest schema in 
Confluent Schema Registry. We have configured Hudi to use the latest schema 
version from schema registry. 
   
   Now, there are some records in Kafka with old schema and while processing 
these records, Hudi is failing with error.
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use Debezium to publish data to Kafka and configure Hudi to use 
PostgresDebeziumSource, PostgresDebeziumAvroPayload and Confluent schema 
registry , schema url pointing to latest version of schema.
   2. Stop Hudi(make sure there are unconsumed records with old schema present 
in Kafka) and update(add new columns) the schema of Postgres table.
   3. Now start Hudi again to process the unconsumed records(with old schema) 
and fetch latest schema from schema registry.
   4. This will cause pipeline failure while converting input records to 
DataSet<Row>. 
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   We expect Hudi to set default value for the the fields which are not present 
in the old schema or process them as it is and store in filestystem(table).
   
   **Environment Description**
   
   * Hudi version : 0.12.0
   
   * Spark version : 3.2.2
   
   * Hive version : 2.3.5
   
   * Hadoop version : 2.7.7
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   `private Dataset<Row> toDataset(OffsetRange[] offsetRanges, KafkaOffsetGen 
offsetGen, String schemaStr) {
       AvroConvertor convertor = new AvroConvertor(schemaStr);
       Dataset<Row> kafkaData;
       if (deserializerClassName.equals(StringDeserializer.class.getName())) {
         kafkaData = AvroConversionUtils.createDataFrame(
             KafkaUtils.<String, String>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent())
                 .map(obj -> convertor.fromJson(obj.value()))
                 .rdd(), schemaStr, sparkSession);
       } else {
         **kafkaData = AvroConversionUtils.createDataFrame(**
             KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges, LocationStrategies.PreferConsistent())
                 .map(obj -> (GenericRecord) obj.value())
                 .rdd(), schemaStr, sparkSession);
       }
   
       // Flatten debezium payload, specific to each DB type (postgres/ mysql/ 
etc..)
       Dataset<Row> debeziumDataset = processDataset(kafkaData);
   
       // Some required transformations to ensure debezium data types are 
converted to spark supported types.
       return convertArrayColumnsToString(convertColumnToNullable(sparkSession,
           convertDateColumns(debeziumDataset, new 
Schema.Parser().parse(schemaStr))));
     }`
   
   the above piece of code is creating data frame with the latest schema 
fetched from schema registry , ideally it should also modify RDD to set the 
default values for fields which are present in latest schema and not present in 
the old data fetched from Kafka. 
   
   Finally pipeline is failing at conversion. 
   
   ` @Deprecated
     def createConverterToRow(sourceAvroSchema: Schema,
                              targetSqlType: StructType): GenericRecord => Row 
= {
       val encoder = RowEncoder.apply(targetSqlType).resolveAndBind()
       val serde = sparkAdapter.createSparkRowSerDe(encoder)
       val converter = 
AvroConversionUtils.createAvroToInternalRowConverter(sourceAvroSchema, 
targetSqlType)
   
       avro => converter.apply(avro).map(serde.deserializeRow).get
     }`
   
   with the below stack trace.
   
   my old version of data has only 72 fields and new field is added in the 
latest schema version. But Hudi is trying to access the field from RDD and 
getting the error.
   
   **Stacktrace**
   
   ```22/11/16 13:04:49 WARN task-result-getter-1 TaskSetManager: Lost task 0.0 
in stage 1.0 (TID 1) (192.168.0.188 executor driver): 
java.lang.ArrayIndexOutOfBoundsException: 73
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:263)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:383)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:379)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:212)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:210)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:365)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:361)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:383)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:379)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:30)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:68)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:97)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:132)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to