sgcisco opened a new issue, #11118:
URL: https://github.com/apache/hudi/issues/11118

   **Describe the problem you faced**
   
   Running a streaming solution with Kafka - Structured Streaming (PySpark) - 
Hudi (MOR tables) + AWS Glue+S3 we observed periodically growing latencies on 
data availability at Hudi.
   Latencies were measured as difference between data generation `timestamp` 
and `_hudi_commit_timestamp` and could go up to 30 min. Periodical manual 
checks for the latest available data points `timestamps`, by running queries as 
described here 
https://hudi.apache.org/docs/0.13.1/querying_data#spark-snap-query,  confirmed 
such delays.
   
   
![image](https://github.com/apache/hudi/assets/168409126/5f7e6e1c-565b-47c1-b293-898cf2d8c40b)
   
   
![image](https://github.com/apache/hudi/assets/168409126/9bb379fa-85bc-467d-853f-8dc9651803b3)
   
   In case of using Spark with Hudi data read-out from Kafka had unstable rate
   
   ![Screenshot 2024-04-29 at 11 49 
29](https://github.com/apache/hudi/assets/168409126/1f114523-a574-4d39-90e8-a6d674f79aa0)
   
   To exclude impact from any other components but Hudi we ran some experiments 
with the same configuration and ingestion settings but without Hudi and with a 
direct write on S3. It did not reveal any delays above 2 mins, where 1 min 
delay is always present due to Structured Streaming minibatch granularity. In 
this case a read-out Kafka rate was stable overtime.
   
   **Additional context**
   
   We tried to optimize Hudi file sizing and MOR layout by applying suggestions 
from these references 
https://github.com/apache/hudi/issues/2151#issuecomment-706400445,
   
https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoItoavoidcreatingtonsofsmallfiles,
   https://github.com/apache/hudi/issues/2151#issuecomment-706400445
   
   We could get a target file size between 90-120Mb by downing 
`hoodie.copyonwrite.record.size.estimate` from 1024 to 100 and using 
`Inline.compact=false and delta.commits=1 and async.compact=true and 
hoodie.merge.small.file.group.candidates.limit=20` but it did not have any 
impact on a latency.
   
   Another commit strategy `NUM_OR_TIME` as suggested here 
https://github.com/apache/hudi/issues/8975#issuecomment-1593408753 with 
parameters below did not help to resolve a problem
   ```
   "hoodie.copyonwrite.record.size.estimate": "100",
   "hoodie.compact.inline.trigger.strategy": "NUM_OR_TIME",
   "hoodie.metadata.compact.max.delta.commits": "5",
   "hoodie.compact.inline.max.delta.seconds": "60",
   ``` 
   
   As a trade-off we came up to the configuration below, which allows us to 
have relatively low latencies for 90th percentile and file size 40-90Mb
   ```
   "hoodie.merge.small.file.group.candidates.limit": "40",
   "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
   ```
   
   
![10_31_12](https://github.com/apache/hudi/assets/168409126/bf85386b-7f6e-48a9-b855-ff8cb391080d)
   
   
   But still some records could go up to 30 min.
   
   
![02_42_29](https://github.com/apache/hudi/assets/168409126/9e6442c1-8cfa-4778-abc5-5d1050cb3653)
   
   However the last config works relatively well for low ingestion rates up to 
1.5Mb/s with a daily partitioning `partition_date=yyyy-MM-dd/` but stops work 
for the rates above 2.5 Mb/s even with more granular partitioning 
`partition_date=yyyy-MM-dd-HH/` 
   
   **Expected behavior**
   
   Since we use MOR tables:
   - low latencies on data availability 
   - proper file sizing defined by the limits                 
   ```
       "hoodie.parquet.small.file.limit" : "104857600",
       "hoodie.parquet.max.file.size" : "125829120",
   ````
   
   **Environment Description**
   
   * Hudi version : 0.13.1
   
   * Spark version : 3.4.1
   
   * Hive version : 3.1
   
   * Hadoop version : EMR 6.13
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   Hudi configuration
   ```
                   "hoodie.datasource.hive_sync.auto_create_database": "true",
                   "hoodie.datasource.hive_sync.enable": "true",
                   "hoodie.datasource.hive_sync.mode": "hms",
                   "hoodie.datasource.hive_sync.table": table_name,
                   "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
                   "hoodie.datasource.hive_sync.use_jdbc": "false",
                   "hoodie.datasource.hive_sync.database": _glue_db_name,
                   "hoodie.datasource.write.hive_style_partitioning": "true",
                   "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
                   "hoodie.datasource.write.operation": "upsert",
                   
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
                   "hoodie.datasource.write.table.name": table_name,
                   "hoodie.datasource.write.table.type": "MERGE_ON_READ",
                   "hoodie.datasource.write.table.name": table_name,
                   "hoodie.metadata.index.bloom.filter.enable": "true",
                   "hoodie.metadata.index.column.stats.enable": "true",
                   "hoodie.table.name": table_name,
                   "hoodie.parquet.small.file.limit" : "104857600",
                   "hoodie.parquet.max.file.size" : "125829120",
                   "hoodie.merge.small.file.group.candidates.limit": "40",
                   "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
   ```
   
   Spark configuration
   ```
               "spark.sql.extensions": 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
               "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
               # Glue support
               "spark.hadoop.hive.metastore.client.factory.class": 
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
               # Spark resource
               "spark.driver.cores": "4",
               "spark.driver.memory": "4400m",
               "spark.driver.memoryOverhead": "800m",
               "spark.executor.cores": "4",
              "spark.executor.memory": "4400m",
              "spark.executor.memoryOverhead": "800m",
              "spark.dynamicAllocation.initialExecutors": "4",
              "spark.dynamicAllocation.minExecutors": "4",
              "spark.dynamicAllocation.maxExecutors": "8"
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to