danielfordfc opened a new issue, #8065: URL: https://github.com/apache/hudi/issues/8065
Created from this request on my [original Community Slack Thread](https://apache-hudi.slack.com/archives/C4D716NPQ/p1677168970257509?thread_ts=1677085842.687439&cid=C4D716NPQ) **Describe the problem you faced** We are using the DeltaStreamer on EMR 6.8.0, sourcing data from Confluent Kafka Avro topics and using our Confluent Schema Registry to deserialize the messages, which we write to the Glue Data Catalog and query with Athena. We are running the deltastreamer in `--continuous` mode with the `KafkaAvroSchemaDeserializer` and are facing some transient errors on upstream schema evolution. The producers are updated in numerous different backwards compatible ways for our testing, which in turn updates our confluent schema registry. The Deltastreamer either handles the evolution gracefully, succeeds in committing, and then updates the Glue Data Catalog accordingly with the new Schema, or fails out with this ``` org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition ... Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle .... Caused by: java.io.EOFException <-- this line changes dependent on the type of schema evolution. ``` Speaking to @yihua and @nsivabalan in the Hudi Office hours, they believed this could potentially be a transient error due to the --continuous mode When you restart the deltastreamer however, **_the evolution ALWAYS succeeds and carries on as normal_** **To Reproduce** With a duplicate environment to the one mentioned at the beginning, our Spark command is: ``` "spark-submit", "--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer", "--conf", "spark.scheduler.mode=FAIR", "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer", "--conf", "spark.sql.catalogImplementation=hive", "--conf", "spark.sql.hive.convertMetastoreParquet=false", "--conf", "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "--conf", "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog", "--conf", "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", "--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true", # IMPORTANT: hudi-utilities-bundle must be declared immediately before any Hudi spark commands "/usr/lib/hudi/hudi-utilities-bundle.jar", "--source-class", "org.apache.hudi.utilities.sources.{{ source_type }}", "--source-ordering-field", "{{ timestamp_field }}", "--table-type", "COPY_ON_WRITE", "--op", "UPSERT", "--enable-sync", "--continuous", # Hudi write config "--target-base-path", f"s3://{bucket}/raw/{{ table }}", "--target-table", "{{ table }}", "--hoodie-conf", "hoodie.database.name={{ database }}_raw", "--hoodie-conf", "hoodie.table.name={{ table }}", "--hoodie-conf", "hoodie.datasource.write.recordkey.field={{ primary_key }}", "--hoodie-conf", "hoodie.datasource.write.precombine.field={{ timestamp_field }}", "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator", "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={{ timestamp_field }}", "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS", "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd", "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer", # AWS Glue Data Catalog config "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true", "--hoodie-conf", "hoodie.datasource.hive_sync.database={{ database }}_raw", "--hoodie-conf", "hoodie.datasource.hive_sync.table={{ table }}", "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date", "--hoodie-conf", f"hoodie.deltastreamer.source.kafka.topic={self.kafka_topic}", "--hoodie-conf", "auto.offset.reset=earliest", "--hoodie-conf", "sasl.mechanism=PLAIN", "--hoodie-conf", "security.protocol=SASL_SSL", "--hoodie-conf", f"bootstrap.servers={self.kafka_bootstrap_servers}", "--hoodie-conf", f'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_sasl_username}" password="{self.kafka_sasl_password}";', "--schemaprovider-class", "org.apache.hudi.utilities.schema.SchemaRegistryProvider", "--hoodie-conf", f"schema.registry.url=https://{self.schema_registry_url}", "--hoodie-conf", "basic.auth.credentials.source=USER_INFO", "--hoodie-conf", f"schema.registry.basic.auth.user.info={self.schema_registry_auth}", "--hoodie-conf", f"hoodie.deltastreamer.schemaprovider.registry.url=https://{self.schema_registry_auth}@{self.schema_registry_url}//subjects/{self.kafka_topic}-value/versions/latest", ``` Have this running, then update the producer (& subsequently, the SR), with a backwards compatible schema evolution, the deltastreamer usually initiates a rollback, requests another commit, then the EMR Step job dies. Here is a list of evolutions we've done (always defaulted, nullable for compatibility) and: - what never triggered this error π - Triggered this error sometimes, but succeeds on retry and has worked gracefullyβ - Triggered the error, and hasn't been seen to succeed gracefully due to lack of testing π However, as I said above, when you restart the deltastreamer however, **_the evolution ALWAYS succeeds and carries on as normal_** **Adding...** - int π - floatβ - array β - Nullable record at root π - Adding int field β - Nullable enum β - Extending symbols π - Array of records π - Extending nested record with array π (doesn't this directly contradict the [hudi documentation](https://hudi.apache.org/docs/schema_evolution/#scenarios)? - Extending nested record with strings β and ints π **Type Changes...** Type changes that should be viable have never triggered this error... **Expected behavior** I expected the deltastreamer to always handle the evolutions gracefully.. I also expect some of the successful evolutions I've done to not be do-able based on the hudi documentation? Specifically around working with array Types. **Answers Required** What's causing this error? Do you need any other information? Driver logs attached. **Environment Description** Hudi version : Deltastreamer on EMR 6.8.0 running Hudi 0.11.1-amzn-0 Spark version : 3.3.0 Hive version : 3.1.3 Hadoop version : Amazon 3.2.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : No EMR 6.9.0 that utilizes Hudi 0.12.1 wasn't used due to getting constant org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool **Additional context** - I have visited the Hudi office hours before writing this issue, where it was suggested that I investigate and write up my findings here if the root cause is still unknown. **Stacktraces** [test-schema-evo-trace-safe.txt](https://github.com/apache/hudi/files/10840065/test-schema-evo-trace-safe.txt) ^^ I can look to recreate the issue at a DEBUG log level if required -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
