sumosha opened a new issue, #12412:
URL: https://github.com/apache/hudi/issues/12412

   
   **Describe the problem you faced**
   
   We could use some guidance on a performance issue we are seeing with a Spark 
Structured Streaming job reading from Kinesis and writing to a MOR Hudi table 
in EMR 7.2 (Hudi 0.14.1-amzn-1). We have noticed very long-running commits in 
Hudi and the iterator age gets behind, up to a day or more, on the Kinesis 
stream as data flows in.
   
   Steps to reproduce the behavior:
   
   Some details about the load, application, and the Hudi table:
   - 20 shard kinesis stream with around 100-500MB per minute, with periodic 
spikes up to 2GB, typically with increased load during the work week
   - Each microbatch appears to consume around 8GB incoming data
   - MOR, Simple Index table. Currently 100.5 GB with 886 partitions dimension 
table.  No meaningful, time-based attribute to cluster or partition by.
   - Record key is a UUID. All writes use the upsert operation. In a given 
batch, it is very likely that close to all partitions will have an update or 
insert.
   - EMR Cluster version 7.2 with 9 m5a.2xlarge workers (plus 1 m5a.2xlarge 
driver). We scaled this up to 20 and saw some write improvement, but still the 
same behaviors as I will describe below.
   
   What we have noticed is that deltacommits typically take around 15 minutes, 
however every few commits we notice an excessively long deltacommit which takes 
around 1.5-3 hours to complete. While this commit is completing, there are no 
GetRecords calls to the stream to continue processing. We compared the commit 
metrics between one of these faster commits and a slow commit and there isn't a 
significant difference in the amount of data being written, maybe about 30k 
records. What I observed was some log file writes as low as 16 records and 
100KB in size took ~4.5 minutes (totalUpsertTime). This seems an excessive 
amount of time for a log file (not parquet) with so few records.
   
   Things we tried: 
   - Turn down the frequency of compaction to every 6 hours - no improvement. 
   - Reduce the number of records read per shard from Kinesis (in case it was 
memory pressure of incoming data) - no improvement
   - Turn off async archive service - This fixed the periodic slow commit (see 
below notes), however commits still appear very slow generally (15-20 minutes)
   - Upgrade to EMR v.7.5.0 (Hudi 0.15) - no improvement
   
   The jobs that took the most time in these slow batches:
   - Loading latest base files for all partitions (590 tasks at 9.8 minutes)
   - Getting small files from partitions (590 tasks at 9.7 minutes)
   - Building workload profile (104 tasks at 13 minutes)
   - Doing partition and writing data (~4648 tasks at 1.5+ hours, still running 
but only halfway through). We believe this correlates to the number of files 
written, so at 72 available cores we can understand this will be slow, however 
we'd like to improve write speeds here.
   
   What is interesting is the jobs Loading latest base files, Getting small 
files jobs typically run in less than a minute, yet during this period of the 
"slow commit," these jobs take many minutes. During the "fast commits," it 
appears the Building Workload Profile job is where most of the commit time is 
spent. I do observe shuffle spill to disk, so we could use guidance on 
improving that as well.
   
   We have conducted some stress testing in a comparable test environment and 
were able to recreate the issue even starting on a blank table. We did the same 
stress testing earlier in the year, using EMR 6.15 (Hudi 0.14.0-amzn), and did 
not perceive this behavior.
   
   What I found in the logs during the time of the slow commits are these logs. 
During the faster commits, these logs don't show up. I noticed around the same 
time the Delete Archived instants jobs had run, which prompted me to try to 
turn off the async archive service.
   24/12/03 02:50:15 WARN PriorityBasedFileSystemView: Got error running 
preferred function. Likely due to another concurrent writer in progress. Trying 
secondary
   24/12/03 02:50:17 WARN PriorityBasedFileSystemView: Routing request to 
secondary file-system view
   
   Turning off async archive service alleviated the periodic extremely slow 
commit, however we still believe commit times are lackluster given the load and 
cluster size.
   
   What other configurations should we look at to tune the write speeds? Do we 
need to reconsider our table design?
   
   Below are the Hudi configurations in our production environment. Attached 
are some Spark UI screenshots as well.
   
   hudi-defaults on the cluster:
   {
         "hoodie.archive.async": "true",
         "hoodie.archive.automatic": "true",
         "hoodie.compact.inline.max.delta.seconds": "21600",
         "hoodie.compact.inline.trigger.strategy": "TIME_ELAPSED",
         "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": 
"true",
         "hoodie.embed.timeline.server.async": "true",
         "hoodie.schema.cache.enable": "true"
   }
   Spark writer configuration:
       "hoodie.cleaner.policy.failed.writes": "LAZY",
       "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
       "hoodie.write.lock.dynamodb.region": "us-east-1",
       "hoodie.write.lock.dynamodb.table": "oitroot-op1-datasync-hudi-locks",
       "hoodie.write.lock.provider": 
"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"
       "path": hudi_output_path,
       "hoodie.table.name": clean_table_name,
       "hoodie.datasource.write.storage.type": "MERGE_ON_READ",
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.recordkey.field": record_key_fields,
       "hoodie.datasource.write.precombine.field": precombine_field,
       "hoodie.datasource.write.partitionpath.field": partition_fields,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.database": clean_db_name,
       "hoodie.datasource.hive_sync.table": clean_table_name,
       "hoodie.datasource.hive_sync.partition_fields": partition_fields,
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.omit_metadata_fields": "true",
       "hoodie.datasource.meta_sync.condition.sync": "true"
   
   Thank you for your assistance!
   
   **Expected behavior**
   
   Consistent and faster commit times, ideally under 10 minutes
   
   **Environment Description**
   
   * Hudi version : 0.14.1-amzn-1
   
   * Spark version : 3.5.1
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.6
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   
![slow_commit_sparkui](https://github.com/user-attachments/assets/799ebcbe-7210-4c67-8fa0-47b2c554286d)
   
![building_workload_profile_spill_sparkui](https://github.com/user-attachments/assets/0e8494a0-9fc6-4fbf-9fb6-6cb1e261a626)
   
![faster_commit_sparkui](https://github.com/user-attachments/assets/e524861e-e94b-4b75-bc36-ec88aceaec7d)
   
![building_workload_profile_executor_metrics_sparkui](https://github.com/user-attachments/assets/a4a5fa1b-98b6-49c7-a52a-f471f0bad59a)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to