hudi-bot opened a new issue, #16282:
URL: https://github.com/apache/hudi/issues/16282

   Testing scenario
   
   1. Postgres cdc flow, with confluent kafka schema registry with 
{*}Compatibility mode:{*}Forward
   
   
   2. Starting schema
   
   create_table_query = '''
   
       CREATE TABLE IF NOT EXISTS bulkinsert_table (
   
           id UUID PRIMARY KEY,
   
           name VARCHAR(255) NOT NULL,
   
           age INT
   
       )
   
    After which we ingest a batch of records
   
    
   
   3. Add a column to the end
   
   ALTER TABLE bulkinsert_table ADD notes VARCHAR(255) NOT NULL
   
   After which we ingest a batch of records
   
   4. Drop the notes column
   
   ALTER TABLE bulkinsert_table DROP COLUMN notes
   
   After which we ingest a batch of records
   
   When we check the schema registry after dropping the column, we see schema 
registry not showing this dropped event
   
   {
     "connect.name": "oh_debezium_570cd9cb.public.bulkinsert_table1.Envelope",
     "fields": [
       {
         "default": null,
         "name": "before",
         "type": [
           "null",
           {
             "connect.name": 
"oh_debezium_570cd9cb.public.bulkinsert_table1.Value",
             "fields": [
               {
                 "name": "id",
                 "type": {
                   "connect.name": "io.debezium.data.Uuid",
                   "connect.version": 1,
                   "type": "string"
                 }
               },
               {
                 "name": "name",
                 "type": "string"
               },
               {
                 "default": null,
                 "name": "age",
                 "type": [
                   "null",
                   "int"
                 ]
               },
               {
                 "default": null,
                 "name": "notes",
                 "type": [
                   "null",
                   "string"
                 ]
               }
             ],
             "name": "Value",
             "type": "record"
           }
         ]
       },
       {
         "default": null,
         "name": "after",
         "type": [
           "null",
           "Value"
         ]
       },
       {
         "name": "source",
         "type": {
           "connect.name": "io.debezium.connector.postgresql.Source",
           "fields": [
             {
               "name": "version",
               "type": "string"
             },
             {
               "name": "connector",
               "type": "string"
             },
             {
               "name": "name",
               "type": "string"
             },
             {
               "name": "ts_ms",
               "type": "long"
             },
             {
               "default": "false",
               "name": "snapshot",
               "type": [
                 {
                   "connect.default": "false",
                   "connect.name": "io.debezium.data.Enum",
                   "connect.parameters": {
                     "allowed": "true,last,false,incremental"
                   },
                   "connect.version": 1,
                   "type": "string"
                 },
                 "null"
               ]
             },
             {
               "name": "db",
               "type": "string"
             },
             {
               "default": null,
               "name": "sequence",
               "type": [
                 "null",
                 "string"
               ]
             },
             {
               "name": "schema",
               "type": "string"
             },
             {
               "name": "table",
               "type": "string"
             },
             {
               "default": null,
               "name": "txId",
               "type": [
                 "null",
                 "long"
               ]
             },
             {
               "default": null,
               "name": "lsn",
               "type": [
                 "null",
                 "long"
               ]
             },
             {
               "default": null,
               "name": "xmin",
               "type": [
                 "null",
                 "long"
               ]
             }
           ],
           "name": "Source",
           "namespace": "io.debezium.connector.postgresql",
           "type": "record"
         }
       },
       {
         "name": "op",
         "type": "string"
       },
       {
         "default": null,
         "name": "ts_ms",
         "type": [
           "null",
           "long"
         ]
       },
       {
         "default": null,
         "name": "transaction",
         "type": [
           "null",
           {
             "fields": [
               {
                 "name": "id",
                 "type": "string"
               },
               {
                 "name": "total_order",
                 "type": "long"
               },
               {
                 "name": "data_collection_order",
                 "type": "long"
               }
             ],
             "name": "ConnectDefault",
             "namespace": "io.confluent.connect.avro",
             "type": "record"
           }
         ]
       }
     ],
     "name": "Envelope",
     "namespace": "oh_debezium_570cd9cb.public.bulkinsert_table1",
     "type": "record"
   }
   
   Now after dropping when we ingestion the next batch of data with just 3 
columns, we hit the following exception
   
   
   
   
   java.lang.ArrayIndexOutOfBoundsException: 3 at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at org.apache.hudi.org.apache.spark.sql.avro.A
 vroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380) 
at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
 at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at org.apache.s
 park.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator.isEmpty(Iterator.scala:387) at 
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
 at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala
 :108) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158) 
at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:131) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(
 Utils.scala:1491) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1
 160) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
 at scala.Option.foreach(Option.scala:407) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at 
org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scal
 a:151) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1422) at 
org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at 
org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at 
org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:545) at 
org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:545) at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:607)
 at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:506)
 at or
 g.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:411) at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:850)
 at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at 
com.onehouse.hudi.OnehouseDeltaStreamer$MultiTableSyncService.lambda$null$1(OnehouseDeltaStreamer.java:319)
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 3 at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDe
 serializer.scala:380) at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
 at org.apache.hudi.org.apache.spark.sql.avro.Ho
 
odieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Itera
 tor$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator.isEmpty(Iterator.scala:387) at 
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
 at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scal
 a:337) at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158) 
at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:131) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) ... 3 more
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-6996
   - Type: Bug


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to