danielfordfc opened a new issue, #7867:
URL: https://github.com/apache/hudi/issues/7867
**Describe the problem you faced**
We are using the DeltaStreamer on EMR 6.9.0, sourcing data from Confluent
Kafka Avro topics and using our Confluent Schema Registry to deserialize the
messages, which we write to the Glue Data Catalog and query with Athena.
For the majority of topics this works well, however, we noticed
deserialisation errors when topics have Avro `enum` types in the schema.
Errors come in two forms, based on whether we use the default
`KafkaAvroDeserializer`, or the `KafkaAvroSchemaDeserializer `
1. **Scala.MatchError**
```
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3
in stage 1.0 (TID 4) (ip-10-154-13-123.eu-west-1.compute.internal executor 1):
scala.MatchError: processing (of class
org.apache.avro.generic.GenericData$EnumSymbol)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13(AvroDeserializer.scala:178)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13$adapted(AvroDeserializer.scala:177)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:379)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:375)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3(AvroDeserializer.scala:389)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3$adapted(AvroDeserializer.scala:385)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:87)
at
org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:105)
at
org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_3AvroDeserializer.deserialize(HoodieSpark3_3AvroDeserializer.scala:30)
```
2. **org.apache.avro.AvroTypeException**
```
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing key/value for partition {topic}-0 at offset 7202. If needed,
please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 4144
Caused by: org.apache.avro.AvroTypeException: Found
{avro_record_namespace}.{enum_name}, expecting string
```
for an enum field resembling the following (note that sometimes there are
`default:` added to the field, but never a `default:` at the symbol level...
```
{
"name": "status",
"type": {
"type": "enum",
"name": "status_options",
"symbols": [
"processing",
"completed",
"error"
]
}
},
```
For instance, we recieved org.apache.avro.AvroTypeException: Found
{avro_record_namespace}.status_options, expecting string, and without the
`KafkaAvroSchemaDeserializer`, we receive scala.MatchError:
`{one_of_the_enum_symbols}` (of class
org.apache.avro.generic.GenericData$EnumSymbol
**To Reproduce**
**Scala.MatchError**
With a duplicate environment to the one mentioned at the beginning, our
Spark command is:
```
"spark-submit",
"--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
"--conf", "spark.scheduler.mode=FAIR",
"--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf", "spark.sql.catalogImplementation=hive",
"--conf", "spark.sql.hive.convertMetastoreParquet=false",
"--conf",
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"--conf",
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"--conf",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true",
# IMPORTANT: hudi-utilities-bundle must be declared immediately before any
Hudi spark commands
"/usr/lib/hudi/hudi-utilities-bundle.jar",
"--source-class", "org.apache.hudi.utilities.sources.{{ source_type }}",
"--source-ordering-field", "{{ timestamp_field }}",
"--table-type", "COPY_ON_WRITE",
"--op", "UPSERT",
"--enable-sync",
"--continuous",
# Hudi write config
"--target-base-path", f"s3://{bucket}/raw/{{ table }}",
"--target-table", "{{ table }}",
"--hoodie-conf", "hoodie.database.name={{ database }}_raw",
"--hoodie-conf", "hoodie.table.name={{ table }}",
"--hoodie-conf", "hoodie.datasource.write.recordkey.field={{ primary_key }}",
"--hoodie-conf", "hoodie.datasource.write.precombine.field={{
timestamp_field }}",
"--hoodie-conf",
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
"--hoodie-conf", "hoodie.datasource.write.partitionpath.field={{
timestamp_field }}",
"--hoodie-conf",
"hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
"--hoodie-conf",
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
# Filter invalid records
"--transformer-class",
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
"--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC>
WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
# AWS Glue Data Catalog config
"--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.database={{ database }}_raw",
"--hoodie-conf", "hoodie.datasource.hive_sync.table={{ table }}",
"--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date",
"--hoodie-conf",
f"hoodie.deltastreamer.source.kafka.topic={self.kafka_topic}",
"--hoodie-conf", "auto.offset.reset=earliest",
"--hoodie-conf", "sasl.mechanism=PLAIN",
"--hoodie-conf", "security.protocol=SASL_SSL",
"--hoodie-conf", f"bootstrap.servers={self.kafka_bootstrap_servers}",
"--hoodie-conf",
f'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required
username="{self.kafka_sasl_username}"
password="{self.kafka_sasl_password}";',
"--schemaprovider-class",
"org.apache.hudi.utilities.schema.SchemaRegistryProvider",
"--hoodie-conf", f"schema.registry.url=https://{self.schema_registry_url}",
"--hoodie-conf", "basic.auth.credentials.source=USER_INFO",
"--hoodie-conf",
f"schema.registry.basic.auth.user.info={self.schema_registry_auth}",
"--hoodie-conf",
f"hoodie.deltastreamer.schemaprovider.registry.url=https://{self.schema_registry_auth}@{self.schema_registry_url}//subjects/{self.kafka_topic}-value/versions/latest",
```
**Most notably, removing the SQL Transformer we are using below seems to
remove the error for many of the topics for both error cases.**
```
"--transformer-class",
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
"--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC>
WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
```
We are using this transformer to attempt to drop null records from being
ingested, to avoid the below error which appears to be caused by empty data in
our selected hoodie.datasource.write.partitionpath.field, although we cannot
find any evidence of this in our topics:
```
Caused by: org.apache.hudi.exception.HoodieException: The value of
{timestamp_field} can not be null
at
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:534)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:490)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
```
But based on the fact that this appears to be causing the errors, there may
be a better way of doing this?
One thing to note is that this error only seems to be occurring on topics
that do not start at an offset of 0, so this error may be a red herring, or bad
data on our end which we don't seem to able to confirm.
**org.apache.avro.AvroTypeException**
The only difference in this test setup is the addition of
```"--hoodie-conf",
"hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer".```
The error is different, but the fix appears to be removing the SQL
transformer, and the working topics from the previous test again work, but the
failing ones with `The value of {timestamp_field} can not be null` remains.
**Expected behavior**
I expected the enum data types to be able to be ingested, even when
including an SQL transformer to only select records that fit the query
description, [based on this
documentation](https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-avro---spark-sql-conversion)
**Answers Required**
If the above is an incorrect assumption, the cause is known and this is
expected behaviour, then my question becomes "How can I filter out potentially
null records that will break the primary key / partition path field handling
i'm using for my `TimestampBasedKeyGenerator` and `UPSERT` behaviour"
&&
Is there an answer to why I may be receiving this `The value of
{timestamp_field} can not be null` error on seemingly OK topics? The source
system apparently guarantees that these fields are always populated, so the
only thing I can think is an offset scanning problem as this failing topic is
the only topic of the ones I tested that didn't start at an offset 0.
**Environment Description**
* Hudi version : Deltastreamer on EMR 6.9.0 running Hudi 0.12.1
* Spark version : 3.3.0
* Hive version : 3.1.3
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : No
**Additional context**
I have visited the Hudi office hours before writing this issue, where it was
suggested that I investigate and write up my findings here if the root cause is
still unknown.
If there's any detail missing from these tests please let me know and I can
recreate and provide more clarity!
**Stacktraces**
- These 3 traces are all processing the same topic with the different
configs discussed, one that throws the `The value of {timestamp_field} can not
be null` error when the transformer is removed.
- Before each test or investigation, if s3 data or tables have been
populated, I cancelled running EMR jobs, dropped any created tables and deleted
underlying S3 data.
**Potentially useful information about the topic:**
- This schema has 5 versions and although the schema registry only enforces
BACKWARDS compatibility, I believe the current evolutions to adhere also to
BACKWARDS_TRANSITIVE.
- The Enum failing in both the Scala Match Error and the
org.apache.avro.AvroTypeException message **“status”** has been unchanged the
entire time, and is the first enum field in the schema
- The topic appears to start at offset 7202, which is the offset declared in
the org.apache.avro.AvroTypeException stack trace
Using Default Deserializer - Scala Match Error
[scala_match_error_stacktrace.txt](https://github.com/apache/hudi/files/10663859/scala_match_afmas_st_clean.txt)
Using KafkaAvroSchemaDeserializer - org.apache.avro.AvroTypeException
[avro_type_error_stacktrace.txt](https://github.com/apache/hudi/files/10663860/avro_type_afmas_st_clean.txt)
Removing the SQL Transformer - `The value of {timestamp_field} can not be
null`
[timestamp_field_cantbe_null_trace.txt](https://github.com/apache/hudi/files/10663925/timestamp_field_cantbe_null_trace.txt)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]