danielfordfc opened a new issue, #7867:
URL: https://github.com/apache/hudi/issues/7867

   **Describe the problem you faced**
   
   We are using the DeltaStreamer on EMR 6.9.0, sourcing data from Confluent 
Kafka Avro topics and using our Confluent Schema Registry to deserialize the 
messages, which we write to the Glue Data Catalog and query with Athena.
   
   For the majority of topics this works well, however, we noticed 
deserialisation errors when topics have Avro `enum` types in the schema.
   
   Errors come in two forms, based on whether we use the default 
`KafkaAvroDeserializer`, or the `KafkaAvroSchemaDeserializer `
   
   1. **Scala.MatchError**
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 4) (ip-10-154-13-123.eu-west-1.compute.internal executor 1): 
scala.MatchError: processing (of class 
org.apache.avro.generic.GenericData$EnumSymbol)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13(AvroDeserializer.scala:178)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13$adapted(AvroDeserializer.scala:177)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:379)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:375)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3(AvroDeserializer.scala:389)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3$adapted(AvroDeserializer.scala:385)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:87)
        at 
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:105)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_3AvroDeserializer.deserialize(HoodieSpark3_3AvroDeserializer.scala:30)
   ```
   
   2. **org.apache.avro.AvroTypeException**
   ```
   Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing key/value for partition {topic}-0 at offset 7202. If needed, 
please seek past the record to continue consumption.
   Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing Avro message for id 4144
   Caused by: org.apache.avro.AvroTypeException: Found 
{avro_record_namespace}.{enum_name}, expecting string
   ```
   
   for an enum field resembling the following (note that sometimes there are 
`default:` added to the field, but never a `default:` at the symbol level...
   ```
   {
         "name": "status",
         "type": {
           "type": "enum",
           "name": "status_options",
           "symbols": [
             "processing",
             "completed",
             "error"
           ]
   }
   },
   ```
   
   For instance, we recieved org.apache.avro.AvroTypeException: Found 
{avro_record_namespace}.status_options, expecting string, and without the 
`KafkaAvroSchemaDeserializer`, we receive scala.MatchError: 
`{one_of_the_enum_symbols}` (of class 
org.apache.avro.generic.GenericData$EnumSymbol
   
   **To Reproduce**
   
   **Scala.MatchError**
   
   With a duplicate environment to the one mentioned at the beginning, our 
Spark command is:
   
   ```
   "spark-submit",
   "--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
   "--conf", "spark.scheduler.mode=FAIR",
   "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
   "--conf", "spark.sql.catalogImplementation=hive",
   "--conf", "spark.sql.hive.convertMetastoreParquet=false",
   "--conf", 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
   "--conf", 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
   "--conf", 
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
   "--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true",
   # IMPORTANT: hudi-utilities-bundle must be declared immediately before any 
Hudi spark commands
   "/usr/lib/hudi/hudi-utilities-bundle.jar",
   "--source-class", "org.apache.hudi.utilities.sources.{{ source_type }}",
   "--source-ordering-field", "{{ timestamp_field }}",
   "--table-type", "COPY_ON_WRITE",
   "--op", "UPSERT",
   "--enable-sync",
   "--continuous",
   # Hudi write config
   
   "--target-base-path", f"s3://{bucket}/raw/{{ table }}",
   "--target-table", "{{ table }}",
   "--hoodie-conf", "hoodie.database.name={{ database }}_raw",
   "--hoodie-conf", "hoodie.table.name={{ table }}",
   "--hoodie-conf", "hoodie.datasource.write.recordkey.field={{ primary_key }}",
   "--hoodie-conf", "hoodie.datasource.write.precombine.field={{ 
timestamp_field }}",
   "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
   "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={{ 
timestamp_field }}",
   "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
   "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
   # Filter invalid records
   "--transformer-class", 
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
   "--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC> 
WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
   # AWS Glue Data Catalog config
   "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
   "--hoodie-conf", "hoodie.datasource.hive_sync.database={{ database }}_raw",
   "--hoodie-conf", "hoodie.datasource.hive_sync.table={{ table }}",
   "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date",
   "--hoodie-conf", 
f"hoodie.deltastreamer.source.kafka.topic={self.kafka_topic}",
   "--hoodie-conf", "auto.offset.reset=earliest",
   "--hoodie-conf", "sasl.mechanism=PLAIN",
   "--hoodie-conf", "security.protocol=SASL_SSL",
   "--hoodie-conf", f"bootstrap.servers={self.kafka_bootstrap_servers}",
   "--hoodie-conf", 
f'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required 
   username="{self.kafka_sasl_username}" 
password="{self.kafka_sasl_password}";',
   "--schemaprovider-class", 
"org.apache.hudi.utilities.schema.SchemaRegistryProvider",
   "--hoodie-conf", f"schema.registry.url=https://{self.schema_registry_url}";,
   "--hoodie-conf", "basic.auth.credentials.source=USER_INFO",
   "--hoodie-conf", 
f"schema.registry.basic.auth.user.info={self.schema_registry_auth}",
   "--hoodie-conf", 
f"hoodie.deltastreamer.schemaprovider.registry.url=https://{self.schema_registry_auth}@{self.schema_registry_url}//subjects/{self.kafka_topic}-value/versions/latest";,
   ```
   
   **Most notably, removing the SQL Transformer we are using below seems to 
remove the error for many of the topics for both error cases.**
   ```
   "--transformer-class", 
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
   "--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC> 
WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
   ```
   
   We are using this transformer to attempt to drop null records from being 
ingested, to avoid the below error which appears to be caused by empty data in 
our selected hoodie.datasource.write.partitionpath.field, although we cannot 
find any evidence of this in our topics:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieException: The value of 
{timestamp_field} can not be null
        at 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:534)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:490)
        at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
   ```
   But based on the fact that this appears to be causing the errors, there may 
be a better way of doing this? 
   One thing to note is that this error only seems to be occurring on topics 
that do not start at an offset of 0, so this error may be a red herring, or bad 
data on our end which we don't seem to able to confirm.
   
   
   **org.apache.avro.AvroTypeException**
   
   The only difference in this test setup is the addition of                    
 
   
   ```"--hoodie-conf", 
"hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer".```
   
   The error is different, but the fix appears to be removing the SQL 
transformer, and the working topics from the previous test again work, but the 
failing ones with `The value of {timestamp_field} can not be null` remains.
   
   
   **Expected behavior**
   
   I expected the enum data types to be able to be ingested, even when 
including an SQL transformer to only select records that fit the query 
description, [based on this 
documentation](https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-avro---spark-sql-conversion)
   
   **Answers Required**
   
   If the above is an incorrect assumption, the cause is known and this is 
expected behaviour, then my question becomes "How can I filter out potentially 
null records that will break the primary key / partition path field handling 
i'm using for my `TimestampBasedKeyGenerator` and `UPSERT` behaviour"
   
   &&
   
   Is there an answer to why I may be receiving this `The value of 
{timestamp_field} can not be null` error on seemingly OK topics? The source 
system apparently guarantees that these fields are always populated, so the 
only thing I can think is an offset scanning problem as this failing topic is 
the only topic of the ones I tested that didn't start at an offset 0.
   
   **Environment Description**
   
   * Hudi version : Deltastreamer on EMR 6.9.0 running Hudi 0.12.1
   * Spark version : 3.3.0
   * Hive version : 3.1.3 
   * Hadoop version : 3.3.3
   * Storage (HDFS/S3/GCS..) : S3
   * Running on Docker? (yes/no) : No
   
   **Additional context**
   
   I have visited the Hudi office hours before writing this issue, where it was 
suggested that I investigate and write up my findings here if the root cause is 
still unknown.
   
   If there's any detail missing from these tests please let me know and I can 
recreate and provide more clarity!
   
   **Stacktraces**
   
   - These 3 traces are all processing the same topic with the different 
configs discussed, one that throws the `The value of {timestamp_field} can not 
be null` error when the transformer is removed.
   - Before each test or investigation, if s3 data or tables have been 
populated, I cancelled running EMR jobs, dropped any created tables and deleted 
underlying S3 data.
   
   **Potentially useful information about the topic:**
   - This schema has 5 versions and although the schema registry only enforces 
BACKWARDS compatibility, I believe the current evolutions to adhere also to 
BACKWARDS_TRANSITIVE.
   - The Enum failing in both the Scala Match Error and the 
org.apache.avro.AvroTypeException  message **“status”** has been unchanged the 
entire time, and is the first enum field in the schema
   - The topic appears to start at offset 7202, which is the offset declared in 
the org.apache.avro.AvroTypeException stack trace
   
   
   Using Default Deserializer - Scala Match Error
   
[scala_match_error_stacktrace.txt](https://github.com/apache/hudi/files/10663859/scala_match_afmas_st_clean.txt)
   Using KafkaAvroSchemaDeserializer -  org.apache.avro.AvroTypeException 
   
[avro_type_error_stacktrace.txt](https://github.com/apache/hudi/files/10663860/avro_type_afmas_st_clean.txt)
   Removing the SQL Transformer  - `The value of {timestamp_field} can not be 
null`
   
[timestamp_field_cantbe_null_trace.txt](https://github.com/apache/hudi/files/10663925/timestamp_field_cantbe_null_trace.txt)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to