Lokesh Lingarajan created HUDI-6996:
---------------------------------------

             Summary: Avro source reader failing with postgres cdc transformer
                 Key: HUDI-6996
                 URL: https://issues.apache.org/jira/browse/HUDI-6996
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Lokesh Lingarajan


Testing scenario

1. Postgres cdc flow, with confluent kafka schema registry with 
{*}Compatibility mode:{*}Forward


2. Starting schema

create_table_query = '''

    CREATE TABLE IF NOT EXISTS bulkinsert_table (

        id UUID PRIMARY KEY,

        name VARCHAR(255) NOT NULL,

        age INT

    )

 After which we ingest a batch of records

 

3. Add a column to the end

ALTER TABLE bulkinsert_table ADD notes VARCHAR(255) NOT NULL

After which we ingest a batch of records

4. Drop the notes column

ALTER TABLE bulkinsert_table DROP COLUMN notes

After which we ingest a batch of records

When we check the schema registry after dropping the column, we see schema 
registry not showing this dropped event

{
  "connect.name": "oh_debezium_570cd9cb.public.bulkinsert_table1.Envelope",
  "fields": [
    {
      "default": null,
      "name": "before",
      "type": [
        "null",
        {
          "connect.name": "oh_debezium_570cd9cb.public.bulkinsert_table1.Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "connect.name": "io.debezium.data.Uuid",
                "connect.version": 1,
                "type": "string"
              }
            },
            {
              "name": "name",
              "type": "string"
            },
            {
              "default": null,
              "name": "age",
              "type": [
                "null",
                "int"
              ]
            },
            {
              "default": null,
              "name": "notes",
              "type": [
                "null",
                "string"
              ]
            }
          ],
          "name": "Value",
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "after",
      "type": [
        "null",
        "Value"
      ]
    },
    {
      "name": "source",
      "type": {
        "connect.name": "io.debezium.connector.postgresql.Source",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "default": "false",
            "name": "snapshot",
            "type": [
              {
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum",
                "connect.parameters": {
                  "allowed": "true,last,false,incremental"
                },
                "connect.version": 1,
                "type": "string"
              },
              "null"
            ]
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "default": null,
            "name": "sequence",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "default": null,
            "name": "txId",
            "type": [
              "null",
              "long"
            ]
          },
          {
            "default": null,
            "name": "lsn",
            "type": [
              "null",
              "long"
            ]
          },
          {
            "default": null,
            "name": "xmin",
            "type": [
              "null",
              "long"
            ]
          }
        ],
        "name": "Source",
        "namespace": "io.debezium.connector.postgresql",
        "type": "record"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "default": null,
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "default": null,
      "name": "transaction",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ],
          "name": "ConnectDefault",
          "namespace": "io.confluent.connect.avro",
          "type": "record"
        }
      ]
    }
  ],
  "name": "Envelope",
  "namespace": "oh_debezium_570cd9cb.public.bulkinsert_table1",
  "type": "record"
}

Now after dropping when we ingestion the next batch of data with just 3 
columns, we hit the following exception




java.lang.ArrayIndexOutOfBoundsException: 3 at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
 at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator.isEmpty(Iterator.scala:387) at 
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
 at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158) 
at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:131) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
 at scala.Option.foreach(Option.scala:407) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at 
org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1422) at 
org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at 
org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at 
org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:545) at 
org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:545) at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:607)
 at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:506)
 at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:411) 
at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:850)
 at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at 
com.onehouse.hudi.OnehouseDeltaStreamer$MultiTableSyncService.lambda$null$1(OnehouseDeltaStreamer.java:319)
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 3 at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
 at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
 at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
 at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
 at scala.collection.Iterator.isEmpty(Iterator.scala:387) at 
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
 at 
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158) 
at 
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.run(Task.scala:131) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) ... 3 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to