sgcisco opened a new issue, #11118: URL: https://github.com/apache/hudi/issues/11118
**Describe the problem you faced** Running a streaming solution with Kafka - Structured Streaming (PySpark) - Hudi (MOR tables) + AWS Glue+S3 we observed periodically growing latencies on data availability at Hudi. Latencies were measured as difference between data generation `timestamp` and `_hudi_commit_timestamp` and could go up to 30 min. Periodical manual checks for the latest available data points `timestamps`, by running queries as described here https://hudi.apache.org/docs/0.13.1/querying_data#spark-snap-query, confirmed such delays.   In case of using Spark with Hudi data read-out from Kafka had unstable rate  To exclude impact from any other components but Hudi we ran some experiments with the same configuration and ingestion settings but without Hudi and with a direct write on S3. It did not reveal any delays above 2 mins, where 1 min delay is always present due to Structured Streaming minibatch granularity. In this case a read-out Kafka rate was stable overtime. **Additional context** We tried to optimize Hudi file sizing and MOR layout by applying suggestions from these references https://github.com/apache/hudi/issues/2151#issuecomment-706400445, https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoItoavoidcreatingtonsofsmallfiles, https://github.com/apache/hudi/issues/2151#issuecomment-706400445 We could get a target file size between 90-120Mb by downing `hoodie.copyonwrite.record.size.estimate` from 1024 to 100 and using `Inline.compact=false and delta.commits=1 and async.compact=true and hoodie.merge.small.file.group.candidates.limit=20` but it did not have any impact on a latency. Another commit strategy `NUM_OR_TIME` as suggested here https://github.com/apache/hudi/issues/8975#issuecomment-1593408753 with parameters below did not help to resolve a problem ``` "hoodie.copyonwrite.record.size.estimate": "100", "hoodie.compact.inline.trigger.strategy": "NUM_OR_TIME", "hoodie.metadata.compact.max.delta.commits": "5", "hoodie.compact.inline.max.delta.seconds": "60", ``` As a trade-off we came up to the configuration below, which allows us to have relatively low latencies for 90th percentile and file size 40-90Mb ``` "hoodie.merge.small.file.group.candidates.limit": "40", "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS", ```  But still some records could go up to 30 min.  However the last config works relatively well for low ingestion rates up to 1.5Mb/s with a daily partitioning `partition_date=yyyy-MM-dd/` but stops work for the rates above 2.5 Mb/s even with more granular partitioning `partition_date=yyyy-MM-dd-HH/` **Expected behavior** Since we use MOR tables: - low latencies on data availability - proper file sizing defined by the limits ``` "hoodie.parquet.small.file.limit" : "104857600", "hoodie.parquet.max.file.size" : "125829120", ```` **Environment Description** * Hudi version : 0.13.1 * Spark version : 3.4.1 * Hive version : 3.1 * Hadoop version : EMR 6.13 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No Hudi configuration ``` "hoodie.datasource.hive_sync.auto_create_database": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.table": table_name, "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.database": _glue_db_name, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true", "hoodie.datasource.write.table.name": table_name, "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.datasource.write.table.name": table_name, "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.table.name": table_name, "hoodie.parquet.small.file.limit" : "104857600", "hoodie.parquet.max.file.size" : "125829120", "hoodie.merge.small.file.group.candidates.limit": "40", "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS", ``` Spark configuration ``` "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", # Glue support "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # Spark resource "spark.driver.cores": "4", "spark.driver.memory": "4400m", "spark.driver.memoryOverhead": "800m", "spark.executor.cores": "4", "spark.executor.memory": "4400m", "spark.executor.memoryOverhead": "800m", "spark.dynamicAllocation.initialExecutors": "4", "spark.dynamicAllocation.minExecutors": "4", "spark.dynamicAllocation.maxExecutors": "8" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
