sydneyhoran opened a new issue, #8966:
URL: https://github.com/apache/hudi/issues/8966

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   Deltastreamer production jobs are facing an ArrayIndexOutOfBoundsException 
when parsing incoming Avro messages. We believe this is due to a new schema 
being registered to the registry by Debezium due to a change in the source 
database (column addition), while there are some messages remaining as the old 
schema (without the new column). When this happens, the job is no longer able 
to progress without manual intervention.
   
   For `hoodie.deltastreamer.schemaprovider.registry.url` we are passing 
`https://{schema_user}:{schema_pw}@{schema_registry}.{region}.gcp.confluent.cloud/subjects/{kafka_topic}-value/versions/latest`.
   
   The schema change was the addition of one column, which is supported 
according to Hudi documentation. Wondering if there is something specific with 
our configuration/properties that does not allow this. Is there another 
`registry.url` that we can provide that doesn't always point to "/latest" but 
instead picks up the actual schema of the message?
   
   We're ingesting from a Confluent Cloud Kafka topic using the Confluent 
Schema registry using `--source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource` and 
`--payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload`.
   
   We do have `hoodie.datasource.write.reconcile.schema=true` but wondering if 
that config is helping or hurting.
   
   We believe that the messages being ingested in this batch are represented by 
the old schema, but the job tried to reference the new schema (`/latest`) even 
though the current records do not adhere.
   
   We tried setting `source-limit 1` to ensure messages were all the same 
format, but we suspect that it was still pointing to `latest` schema.
   
   The temporary solution for these tables was to:
   
   1. Hard code the schema registry URL to use “versions/1” instead of 
“versions/latest”
   2. Allow the job to ingest the records for a while, until they were past the 
timestamp of the known schema evolution
   3. Cancel the job and restart using “versions/latest” again
   4. New messages are picked up with the new schema and are written 
successfully
   
   However, this method could potentially result in data loss of the new column 
because during Step 2, we were not ingesting the new column until we set the 
schema back to "latest".
   
   Any assistance would be appreciated!
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Run Deltastreamer on a PostgresDebeziumSource with Kafka topics and 
Confluent Cloud schema registry, pointing to schema "/latest"
   2. Have a bit of consumer lag on the topic so there are always messages 
being sent/ingested on the topic
   3. Make a change to the source DB that adds a new column
   4. Observe that the new schema version has been incremented for the new 
schema
   5. Continue watching the Deltastreamer job, observe an 
`ArrayIndexOutOfBoundsException` when it tries to read the old messages against 
the new schema
   
   **Expected behavior**
   
   Schema evolution should be managed gracefully by Deltastreamer, messages 
should be read as the schema that they currently are instead of getting Out Of 
Bounds exception by trying to mix it with a new schema version.
   
   **Environment Description**
   
   * Hudi version : latest
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Hoodie configs
   ```
   --target-base-path s3a://{{ bucket }}/{{ table_path }}
   --target-table {{ table_name }}
   --continuous
   --props gs://path/to/tablename.properties
   --min-sync-interval-seconds 15
   --source-ordering-field updated_at
   --source-limit 5000
   --table-type COPY_ON_WRITE
   --op UPSERT
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   ```
   
   tablename.properties
   ```
   hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url 
}}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=some.topic
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=inserted_at
   hoodie.datasource.write.precombine.field=updated_at
   schema.registry.url={{ schema_url }}
   schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username='{{ kafka_user }}' password='{{ kafka_key }}';
   bootstrap.servers={{ bootstrap_server }}
   hoodie.embed.timeline.server=false
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   group.id=hudi-deltastreamer
   security.protocol=SASL_SSL
   sasl.mechanism=PLAIN
   basic.auth.credentials.source=USER_INFO
   heartbeat.interval.ms=5000
   session.timeout.ms=120000
   request.timeout.ms=900000
   retry.backoff.ms=500
   hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true
   max.rounds.without.new.data.to.shutdown=5
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
   hoodie.cleaner.policy.failed.writes=LAZY
   hoodie.client.heartbeat.interval_in_ms=120000
   hoodie.client.heartbeat.tolerable.misses=10
   hoodie.datasource.write.reconcile.schema=true
   hoodie.write.lock.client.wait_time_ms_between_retry=1000
   hoodie.write.lock.max_wait_time_ms_between_retry=1000
   hoodie.write.lock.wait_time_ms_between_retry=500
   hoodie.write.lock.wait_time_ms=5000
   hoodie.write.lock.client.num_retries=10
   hoodie.metadata.enable=false
   ```
   
   **Stacktrace**
   
   ```
   23/06/13 18:11:08 INFO 
org.apache.hudi.utilities.sources.debezium.DebeziumSource: About to read 5000 
from Kafka for topic :production-casino_ca_on.public.jackpots
   23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: 
overriding enable.auto.commit to false for executor
   23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: 
overriding auto.offset.reset to none for executor
   23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: 
overriding executor group.id to spark-executor-hudi-deltastreamer
   23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: 
overriding receive.buffer.bytes to 65536 see KAFKA-3135
   23/06/13 18:11:08 INFO 
org.apache.hudi.utilities.sources.debezium.DebeziumSource: Date fields: []
   23/06/13 18:11:08 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 
0.0 in stage 39.0 (TID 46) (10.253.240.16 executor 1): 
java.lang.ArrayIndexOutOfBoundsException: 16
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   23/06/13 18:11:10 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in 
stage 39.0 failed 4 times; aborting job
   23/06/13 18:11:10 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: 
Shutting down embedded timeline server
   23/06/13 18:11:10 ERROR 
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer: error 
while running MultiTableDeltaStreamer for table: public.jackpots
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 39.0 failed 4 times, most recent failure: Lost task 0.3 in stage 39.0 
(TID 52) (10.253.240.16 executor 1): java.lang.ArrayIndexOutOfBoundsException: 
16
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
        at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3019)
        at 
org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3018)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
        at org.apache.spark.sql.Dataset.count(Dataset.scala:3018)
        at 
org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:125)
        at 
org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:583)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:491)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:399)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:801)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:71)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.sync(HoodieMultiTableDeltaStreamer.java:440)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:258)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.lang.ArrayIndexOutOfBoundsException: 16
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73)
        at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   Thanks in advance for any help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to