jkdll opened a new issue #3113: URL: https://github.com/apache/hudi/issues/3113
**Problem** I am running deltastreamer (spark submit below) with schema registry provider `SchemaRegistryProvider` and source class `AvroKafkaSource`. I am reading a schema which contains UNION Avro types (sample below). While running, the deltastreamer seems to not be capable of reading UNION types which contain Nulls, with the error: `ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL`. ``` spark-submit \ --master yarn \ --deploy-mode cluster \ --files "/home/workspace/configs/stage/*" \ --packages org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:2.4.7 \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /lib/hudi/hudi-utilities-bundle_2.11-0.7.0-amzn-1.jar` \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --op UPSERT \ --source-ordering-field timestamp \ --table-type COPY_ON_WRITE \ --target-table "$1" \ --target-base-path "s3a://aws-hudi-data/data/stage/data/$1" \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --continuous \ --enable-sync \ --min-sync-interval-seconds 5 \ --hoodie-conf "group.id=test" \ --hoodie-conf "auto.offset.reset=earliest" \ --hoodie-conf "hoodie.datasource.write.recordkey.field=body.id" \ --hoodie-conf "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator" \ --hoodie-conf "hoodie.deltastreamer.source.kafka.topic=$2" \ --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry-url.com/subjects/$2-value/versions/latest" \ --hoodie-conf "hoodie.datasource.write.partitionpath.field=timestamp" ``` **The Schema Contains Fields with UNION Structs and Arrays such as the following:** ``` { "name": "Title", "type": [ "null", { "type": "record", "name": "body", "namespace": "Title.additional.Payload", "fields": [ { "name": "Name", "type": "string" }, { "name": "value", "type": [ "null", "string" ] } ] } ], "default": null } ``` **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** The deltastreamer is running on the AWS EMR version 5.33. Details of the distribution are below * Hudi version : 0.7.0 * Spark version : 2.11.12 * Hive version : 2.37-amzn-4 * Hadoop version : 2.10.1-amzn-1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Stacktrace** ``` 21/06/18 23:47:53 ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:130) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.scala) at org.apache.hudi.utilities.schema.SparkAvroPostProcessor.processSchema(SparkAvroPostProcessor.java:44) at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.lambda$getSourceSchema$0(SchemaProviderWithPostProcessor.java:41) at org.apache.hudi.common.util.Option.map(Option.java:107) at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:41) at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:660) at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:207) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:560) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:138) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:102) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
