sydneyhoran opened a new issue, #8966: URL: https://github.com/apache/hudi/issues/8966
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** Deltastreamer production jobs are facing an ArrayIndexOutOfBoundsException when parsing incoming Avro messages. We believe this is due to a new schema being registered to the registry by Debezium due to a change in the source database (column addition), while there are some messages remaining as the old schema (without the new column). When this happens, the job is no longer able to progress without manual intervention. For `hoodie.deltastreamer.schemaprovider.registry.url` we are passing `https://{schema_user}:{schema_pw}@{schema_registry}.{region}.gcp.confluent.cloud/subjects/{kafka_topic}-value/versions/latest`. The schema change was the addition of one column, which is supported according to Hudi documentation. Wondering if there is something specific with our configuration/properties that does not allow this. Is there another `registry.url` that we can provide that doesn't always point to "/latest" but instead picks up the actual schema of the message? We're ingesting from a Confluent Cloud Kafka topic using the Confluent Schema registry using `--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource` and `--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload`. We do have `hoodie.datasource.write.reconcile.schema=true` but wondering if that config is helping or hurting. We believe that the messages being ingested in this batch are represented by the old schema, but the job tried to reference the new schema (`/latest`) even though the current records do not adhere. We tried setting `source-limit 1` to ensure messages were all the same format, but we suspect that it was still pointing to `latest` schema. The temporary solution for these tables was to: 1. Hard code the schema registry URL to use “versions/1” instead of “versions/latest” 2. Allow the job to ingest the records for a while, until they were past the timestamp of the known schema evolution 3. Cancel the job and restart using “versions/latest” again 4. New messages are picked up with the new schema and are written successfully However, this method could potentially result in data loss of the new column because during Step 2, we were not ingesting the new column until we set the schema back to "latest". Any assistance would be appreciated! **To Reproduce** Steps to reproduce the behavior: 1. Run Deltastreamer on a PostgresDebeziumSource with Kafka topics and Confluent Cloud schema registry, pointing to schema "/latest" 2. Have a bit of consumer lag on the topic so there are always messages being sent/ingested on the topic 3. Make a change to the source DB that adds a new column 4. Observe that the new schema version has been incremented for the new schema 5. Continue watching the Deltastreamer job, observe an `ArrayIndexOutOfBoundsException` when it tries to read the old messages against the new schema **Expected behavior** Schema evolution should be managed gracefully by Deltastreamer, messages should be read as the schema that they currently are instead of getting Out Of Bounds exception by trying to mix it with a new schema version. **Environment Description** * Hudi version : latest * Spark version : 3.1 * Hive version : N/A * Hadoop version : N/A * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** Hoodie configs ``` --target-base-path s3a://{{ bucket }}/{{ table_path }} --target-table {{ table_name }} --continuous --props gs://path/to/tablename.properties --min-sync-interval-seconds 15 --source-ordering-field updated_at --source-limit 5000 --table-type COPY_ON_WRITE --op UPSERT --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload ``` tablename.properties ``` hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest hoodie.deltastreamer.source.kafka.topic=some.topic hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=inserted_at hoodie.datasource.write.precombine.field=updated_at schema.registry.url={{ schema_url }} schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }} sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}'; bootstrap.servers={{ bootstrap_server }} hoodie.embed.timeline.server=false hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd group.id=hudi-deltastreamer security.protocol=SASL_SSL sasl.mechanism=PLAIN basic.auth.credentials.source=USER_INFO heartbeat.interval.ms=5000 session.timeout.ms=120000 request.timeout.ms=900000 retry.backoff.ms=500 hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true max.rounds.without.new.data.to.shutdown=5 hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider hoodie.cleaner.policy.failed.writes=LAZY hoodie.client.heartbeat.interval_in_ms=120000 hoodie.client.heartbeat.tolerable.misses=10 hoodie.datasource.write.reconcile.schema=true hoodie.write.lock.client.wait_time_ms_between_retry=1000 hoodie.write.lock.max_wait_time_ms_between_retry=1000 hoodie.write.lock.wait_time_ms_between_retry=500 hoodie.write.lock.wait_time_ms=5000 hoodie.write.lock.client.num_retries=10 hoodie.metadata.enable=false ``` **Stacktrace** ``` 23/06/13 18:11:08 INFO org.apache.hudi.utilities.sources.debezium.DebeziumSource: About to read 5000 from Kafka for topic :production-casino_ca_on.public.jackpots 23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: overriding enable.auto.commit to false for executor 23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: overriding auto.offset.reset to none for executor 23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: overriding executor group.id to spark-executor-hudi-deltastreamer 23/06/13 18:11:08 WARN org.apache.spark.streaming.kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 23/06/13 18:11:08 INFO org.apache.hudi.utilities.sources.debezium.DebeziumSource: Date fields: [] 23/06/13 18:11:08 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 39.0 (TID 46) (10.253.240.16 executor 1): java.lang.ArrayIndexOutOfBoundsException: 16 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98) at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35) at org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45) at org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 23/06/13 18:11:10 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 39.0 failed 4 times; aborting job 23/06/13 18:11:10 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Shutting down embedded timeline server 23/06/13 18:11:10 ERROR org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer: error while running MultiTableDeltaStreamer for table: public.jackpots org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 4 times, most recent failure: Lost task 0.3 in stage 39.0 (TID 52) (10.253.240.16 executor 1): java.lang.ArrayIndexOutOfBoundsException: 16 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98) at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35) at org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45) at org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3019) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3018) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698) at org.apache.spark.sql.Dataset.count(Dataset.scala:3018) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:125) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:583) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:491) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:399) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:801) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:71) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212) at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.sync(HoodieMultiTableDeltaStreamer.java:440) at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:258) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ArrayIndexOutOfBoundsException: 16 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18(AvroDeserializer.scala:196) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$18$adapted(AvroDeserializer.scala:194) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:351) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:347) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:372) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:368) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:84) at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:98) at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroDeserializer.deserialize(HoodieSpark3_1AvroDeserializer.scala:35) at org.apache.hudi.AvroConversionUtils$.$anonfun$createAvroToInternalRowConverter$1(AvroConversionUtils.scala:45) at org.apache.hudi.AvroConversionUtils$.$anonfun$createConverterToRow$1(AvroConversionUtils.scala:73) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:107) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:134) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` Thanks in advance for any help! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
