jkdll opened a new issue #3113:
URL: https://github.com/apache/hudi/issues/3113


   **Problem**
   
   I am running deltastreamer (spark submit below) with schema registry 
provider `SchemaRegistryProvider` and source class `AvroKafkaSource`. I am 
reading a schema which contains UNION Avro types (sample below). While running, 
the deltastreamer seems to not be capable of reading UNION types which contain 
Nulls, with the error: `ERROR Client: Application diagnostics message: User 
class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: 
Unsupported type NULL`.
   ```
   spark-submit \
   --master yarn \
   --deploy-mode cluster \
   --files "/home/workspace/configs/stage/*" \
   --packages 
org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:2.4.7
 \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls 
/lib/hudi/hudi-utilities-bundle_2.11-0.7.0-amzn-1.jar` \
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
   --op UPSERT \
   --source-ordering-field timestamp \
   --table-type COPY_ON_WRITE \
   --target-table "$1" \
   --target-base-path "s3a://aws-hudi-data/data/stage/data/$1" \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --continuous \
   --enable-sync \
   --min-sync-interval-seconds 5 \
   --hoodie-conf "group.id=test" \
   --hoodie-conf "auto.offset.reset=earliest" \
   --hoodie-conf "hoodie.datasource.write.recordkey.field=body.id" \
   --hoodie-conf 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator"
 \
   --hoodie-conf "hoodie.deltastreamer.source.kafka.topic=$2" \
   --hoodie-conf 
"hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry-url.com/subjects/$2-value/versions/latest";
 \
   --hoodie-conf "hoodie.datasource.write.partitionpath.field=timestamp"
   ```
   
   **The Schema Contains Fields with UNION Structs and Arrays such as the 
following:**
   ```
   {
                               "name": "Title",
                               "type": [
                                 "null",
                                 {
                                   "type": "record",
                                   "name": "body",
                                   "namespace": "Title.additional.Payload",
                                   "fields": [
                                     {
                                       "name": "Name",
                                       "type": "string"
                                     },
                                     {
                                       "name": "value",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     }
                                   ]
                                 }
                               ],
                               "default": null
                             }
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   The deltastreamer is running on the AWS EMR version 5.33. Details of the 
distribution are below
   
   * Hudi version : 0.7.0
   
   * Spark version : 2.11.12
   
   * Hive version : 2.37-amzn-4
   
   * Hadoop version : 2.10.1-amzn-1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   **Stacktrace**
   
   ```
   21/06/18 23:47:53 ERROR Client: Application diagnostics message: User class 
threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: 
Unsupported type NULL
           at 
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:130)
           at 
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
           at 
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
           at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
           at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
           at scala.collection.AbstractTraversable.map(Traversable.scala:104)
           at 
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
           at 
org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
           at 
org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
           at 
org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.scala)
           at 
org.apache.hudi.utilities.schema.SparkAvroPostProcessor.processSchema(SparkAvroPostProcessor.java:44)
           at 
org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.lambda$getSourceSchema$0(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.common.util.Option.map(Option.java:107)
           at 
org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:41)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:660)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:207)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:560)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:138)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:102)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to