danielfordfc opened a new issue, #8065:
URL: https://github.com/apache/hudi/issues/8065

   Created from this request on my [original Community Slack 
Thread](https://apache-hudi.slack.com/archives/C4D716NPQ/p1677168970257509?thread_ts=1677085842.687439&cid=C4D716NPQ)
 
   
   
   **Describe the problem you faced**
   
   We are using the DeltaStreamer on EMR 6.8.0, sourcing data from Confluent 
Kafka Avro topics and using our Confluent Schema Registry to deserialize the 
messages, which we write to the Glue Data Catalog and query with Athena.
   
   We are running the deltastreamer in `--continuous` mode with the 
`KafkaAvroSchemaDeserializer` and are facing some transient errors on upstream 
schema evolution.
   
   The producers are updated in numerous different backwards compatible ways 
for our testing, which in turn updates our confluent schema registry. The 
Deltastreamer either handles the evolution gracefully, succeeds in committing, 
and then updates the Glue Data Catalog accordingly with the new Schema, or 
fails out with this
   
   ```
   org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition 
   ...
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close 
UpdateHandle
   ....
   Caused by: java.io.EOFException <-- this line changes dependent on the type 
of schema evolution.
   ```
   
   Speaking to @yihua and @nsivabalan in the Hudi Office hours, they believed 
this could potentially be a transient error due to the --continuous mode
   
   When you restart the deltastreamer however, **_the evolution ALWAYS succeeds 
and carries on as normal_**
   
   **To Reproduce**
   With a duplicate environment to the one mentioned at the beginning, our 
Spark command is:
   
   ```
   "spark-submit",
   "--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
   "--conf", "spark.scheduler.mode=FAIR",
   "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
   "--conf", "spark.sql.catalogImplementation=hive",
   "--conf", "spark.sql.hive.convertMetastoreParquet=false",
   "--conf", 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
   "--conf", 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
   "--conf", 
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
   "--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true",
   # IMPORTANT: hudi-utilities-bundle must be declared immediately before any 
Hudi spark commands
   "/usr/lib/hudi/hudi-utilities-bundle.jar",
   "--source-class", "org.apache.hudi.utilities.sources.{{ source_type }}",
   "--source-ordering-field", "{{ timestamp_field }}",
   "--table-type", "COPY_ON_WRITE",
   "--op", "UPSERT",
   "--enable-sync",
   "--continuous",
   # Hudi write config
   "--target-base-path", f"s3://{bucket}/raw/{{ table }}",
   "--target-table", "{{ table }}",
   "--hoodie-conf", "hoodie.database.name={{ database }}_raw",
   "--hoodie-conf", "hoodie.table.name={{ table }}",
   "--hoodie-conf", "hoodie.datasource.write.recordkey.field={{ primary_key }}",
   "--hoodie-conf", "hoodie.datasource.write.precombine.field={{ 
timestamp_field }}",
   "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
   "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={{ 
timestamp_field }}",
   "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
   "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
   "--hoodie-conf", 
"hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer",
   # AWS Glue Data Catalog config
   "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
   "--hoodie-conf", "hoodie.datasource.hive_sync.database={{ database }}_raw",
   "--hoodie-conf", "hoodie.datasource.hive_sync.table={{ table }}",
   "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date",
   "--hoodie-conf", 
f"hoodie.deltastreamer.source.kafka.topic={self.kafka_topic}",
   "--hoodie-conf", "auto.offset.reset=earliest",
   "--hoodie-conf", "sasl.mechanism=PLAIN",
   "--hoodie-conf", "security.protocol=SASL_SSL",
   "--hoodie-conf", f"bootstrap.servers={self.kafka_bootstrap_servers}",
   "--hoodie-conf", 
f'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required 
   username="{self.kafka_sasl_username}" 
password="{self.kafka_sasl_password}";',
   "--schemaprovider-class", 
"org.apache.hudi.utilities.schema.SchemaRegistryProvider",
   "--hoodie-conf", f"schema.registry.url=https://{self.schema_registry_url}";,
   "--hoodie-conf", "basic.auth.credentials.source=USER_INFO",
   "--hoodie-conf", 
f"schema.registry.basic.auth.user.info={self.schema_registry_auth}",
   "--hoodie-conf", 
f"hoodie.deltastreamer.schemaprovider.registry.url=https://{self.schema_registry_auth}@{self.schema_registry_url}//subjects/{self.kafka_topic}-value/versions/latest";,
   ```
   
   Have this running, then update the producer (& subsequently, the SR), with a 
backwards compatible schema evolution, the deltastreamer usually initiates a 
rollback, requests another commit, then the EMR Step job dies.
   
   Here is a list of evolutions we've done (always defaulted, nullable for 
compatibility) and:
   - what never triggered this error πŸ’š 
   - Triggered this error sometimes, but succeeds on retry and has worked 
gracefully❗
   - Triggered the error, and hasn't been seen to succeed gracefully due to 
lack of testing πŸ›‘   
   
   However, as I said above, when you restart the deltastreamer however, **_the 
evolution ALWAYS succeeds and carries on as normal_**
   
   **Adding...**
   - int πŸ’š 
   - float❗ 
   - array ❗ 
   - Nullable record at root πŸ’š 
       -  Adding int field ❗ 
   - Nullable enum ❗ 
       - Extending symbols πŸ’š  
   - Array of records πŸ’š 
       - Extending nested record with array πŸ›‘  (doesn't this directly 
contradict the [hudi 
documentation](https://hudi.apache.org/docs/schema_evolution/#scenarios)? 
       - Extending nested record with strings ❗ and ints πŸ’š 
   
   **Type Changes...**
   Type changes that should be viable have never triggered this error...
   
   **Expected behavior**
   
   I expected the deltastreamer to always handle the evolutions gracefully..
   I also expect some of the successful evolutions I've done to not be do-able 
based on the hudi documentation? Specifically around working with array Types.
   
   **Answers Required**
   
   What's causing this error? Do you need any other information? Driver logs 
attached.
   
   **Environment Description**
   
   Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0
   Spark version : 3.3.0
   Hive version : 3.1.3
   Hadoop version : Amazon 3.2.1
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : No
   EMR 6.9.0 that utilizes Hudi 0.12.1 wasn't used due to getting constant 
org.apache.hudi.exception.HoodieException: Could not sync using the meta sync 
class org.apache.hudi.hive.HiveSyncTool
   
   **Additional context**
   
   - I have visited the Hudi office hours before writing this issue, where it 
was suggested that I investigate and write up my findings here if the root 
cause is still unknown.
   
   **Stacktraces**
   
[test-schema-evo-trace-safe.txt](https://github.com/apache/hudi/files/10840065/test-schema-evo-trace-safe.txt)
   
   ^^ I can look to recreate the issue at a DEBUG log level if required
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to