Lokesh Lingarajan created HUDI-6996:
---------------------------------------
Summary: Avro source reader failing with postgres cdc transformer
Key: HUDI-6996
URL: https://issues.apache.org/jira/browse/HUDI-6996
Project: Apache Hudi
Issue Type: Bug
Reporter: Lokesh Lingarajan
Testing scenario
1. Postgres cdc flow, with confluent kafka schema registry with
{*}Compatibility mode:{*}Forward
2. Starting schema
create_table_query = '''
CREATE TABLE IF NOT EXISTS bulkinsert_table (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
age INT
)
After which we ingest a batch of records
3. Add a column to the end
ALTER TABLE bulkinsert_table ADD notes VARCHAR(255) NOT NULL
After which we ingest a batch of records
4. Drop the notes column
ALTER TABLE bulkinsert_table DROP COLUMN notes
After which we ingest a batch of records
When we check the schema registry after dropping the column, we see schema
registry not showing this dropped event
{
"connect.name": "oh_debezium_570cd9cb.public.bulkinsert_table1.Envelope",
"fields": [
{
"default": null,
"name": "before",
"type": [
"null",
{
"connect.name": "oh_debezium_570cd9cb.public.bulkinsert_table1.Value",
"fields": [
{
"name": "id",
"type": {
"connect.name": "io.debezium.data.Uuid",
"connect.version": 1,
"type": "string"
}
},
{
"name": "name",
"type": "string"
},
{
"default": null,
"name": "age",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "notes",
"type": [
"null",
"string"
]
}
],
"name": "Value",
"type": "record"
}
]
},
{
"default": null,
"name": "after",
"type": [
"null",
"Value"
]
},
{
"name": "source",
"type": {
"connect.name": "io.debezium.connector.postgresql.Source",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
},
{
"default": "false",
"name": "snapshot",
"type": [
{
"connect.default": "false",
"connect.name": "io.debezium.data.Enum",
"connect.parameters": {
"allowed": "true,last,false,incremental"
},
"connect.version": 1,
"type": "string"
},
"null"
]
},
{
"name": "db",
"type": "string"
},
{
"default": null,
"name": "sequence",
"type": [
"null",
"string"
]
},
{
"name": "schema",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"default": null,
"name": "txId",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "lsn",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "xmin",
"type": [
"null",
"long"
]
}
],
"name": "Source",
"namespace": "io.debezium.connector.postgresql",
"type": "record"
}
},
{
"name": "op",
"type": "string"
},
{
"default": null,
"name": "ts_ms",
"type": [
"null",
"long"
]
},
{
"default": null,
"name": "transaction",
"type": [
"null",
{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
],
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"type": "record"
}
]
}
],
"name": "Envelope",
"namespace": "oh_debezium_570cd9cb.public.bulkinsert_table1",
"type": "record"
}
Now after dropping when we ingestion the next batch of data with just 3
columns, we hit the following exception
java.lang.ArrayIndexOutOfBoundsException: 3 at
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
at
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator.isEmpty(Iterator.scala:387) at
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
at
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:131) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at
org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at
org.apache.spark.rdd.RDD.take(RDD.scala:1422) at
org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557) at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at
org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) at
org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:545) at
org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:545) at
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:607)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:506)
at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:411)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:850)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at
com.onehouse.hudi.OnehouseDeltaStreamer$MultiTableSyncService.lambda$null$1(OnehouseDeltaStreamer.java:319)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) Caused by:
java.lang.ArrayIndexOutOfBoundsException: 3 at
org.apache.avro.generic.GenericData$Record.get(GenericData.java:263) at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:213)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:211)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:366)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:362)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:384)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:380)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:88)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:106)
at
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_2AvroDeserializer.deserialize(HoodieSpark3_2AvroDeserializer.scala:31)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:46)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:74)
at
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:108)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at
scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator.isEmpty(Iterator.scala:387) at
scala.collection.Iterator.isEmpty$(Iterator.scala:387) at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:757)
at
org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at
org.apache.spark.sql.execution.SQLConfInjectingRDD.$anonfun$compute$1(SQLConfInjectingRDD.scala:55)
at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
at
org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:55)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:131) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) ... 3 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)