jkdll commented on issue #3113: URL: https://github.com/apache/hudi/issues/3113#issuecomment-871620908
@n3nash For the following error: ``` ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL ``` I dug a bit deeper and found that the Spark Avro version we were running, 2.4.8, does not support null attributes, as shown here: https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala However, updated versions of the library, specifically 3.x do: https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala Thus we upgraded our EMR cluster to run Spark 3, also with Hudi 0.7.0; after this, the error disappeared. I ran a spark submit command using the SQL transformer, with a simple query: `SELECT CAST(body.Id as STRING) as id, CAST(COALESCE(body.Payload.creation.time,'') as STRING) as body_Payload_creation_time FROM <SRC>;` However, we are met with another error: ``` Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ip-10-102-8-124.eu-central-1.compute.internal, executor 1): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path messageKey is not compatible (avroType = NullType, sqlType = NULL). Source Avro Schema: ... Target Catalyst type: ... at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265) at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146) at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52) at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100) at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133) at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.take(RDD.scala:1396) at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1531) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1531) at org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:544) at org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:544) at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:587) ... 4 more Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path routingKey is not compatible (avroType = NullType, sqlType = NULL). at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265) at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146) at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52) at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100) at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) ... 3 more ``` Spark Version: version 3.0.0-amzn-0 Scala version: 2.12.10 Java Version: OpenJDK 1.8.0_252 Hudi Version: 0.7.0 Spark-Avro Version: org.apache.spark:spark-avro_2.12:3.0.0 Meanwhile, without the SQL transform, errors are still the same as "Problem 1" in the original ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
