sumosha opened a new issue, #12412:
URL: https://github.com/apache/hudi/issues/12412
**Describe the problem you faced**
We could use some guidance on a performance issue we are seeing with a Spark
Structured Streaming job reading from Kinesis and writing to a MOR Hudi table
in EMR 7.2 (Hudi 0.14.1-amzn-1). We have noticed very long-running commits in
Hudi and the iterator age gets behind, up to a day or more, on the Kinesis
stream as data flows in.
Steps to reproduce the behavior:
Some details about the load, application, and the Hudi table:
- 20 shard kinesis stream with around 100-500MB per minute, with periodic
spikes up to 2GB, typically with increased load during the work week
- Each microbatch appears to consume around 8GB incoming data
- MOR, Simple Index table. Currently 100.5 GB with 886 partitions dimension
table. No meaningful, time-based attribute to cluster or partition by.
- Record key is a UUID. All writes use the upsert operation. In a given
batch, it is very likely that close to all partitions will have an update or
insert.
- EMR Cluster version 7.2 with 9 m5a.2xlarge workers (plus 1 m5a.2xlarge
driver). We scaled this up to 20 and saw some write improvement, but still the
same behaviors as I will describe below.
What we have noticed is that deltacommits typically take around 15 minutes,
however every few commits we notice an excessively long deltacommit which takes
around 1.5-3 hours to complete. While this commit is completing, there are no
GetRecords calls to the stream to continue processing. We compared the commit
metrics between one of these faster commits and a slow commit and there isn't a
significant difference in the amount of data being written, maybe about 30k
records. What I observed was some log file writes as low as 16 records and
100KB in size took ~4.5 minutes (totalUpsertTime). This seems an excessive
amount of time for a log file (not parquet) with so few records.
Things we tried:
- Turn down the frequency of compaction to every 6 hours - no improvement.
- Reduce the number of records read per shard from Kinesis (in case it was
memory pressure of incoming data) - no improvement
- Turn off async archive service - This fixed the periodic slow commit (see
below notes), however commits still appear very slow generally (15-20 minutes)
- Upgrade to EMR v.7.5.0 (Hudi 0.15) - no improvement
The jobs that took the most time in these slow batches:
- Loading latest base files for all partitions (590 tasks at 9.8 minutes)
- Getting small files from partitions (590 tasks at 9.7 minutes)
- Building workload profile (104 tasks at 13 minutes)
- Doing partition and writing data (~4648 tasks at 1.5+ hours, still running
but only halfway through). We believe this correlates to the number of files
written, so at 72 available cores we can understand this will be slow, however
we'd like to improve write speeds here.
What is interesting is the jobs Loading latest base files, Getting small
files jobs typically run in less than a minute, yet during this period of the
"slow commit," these jobs take many minutes. During the "fast commits," it
appears the Building Workload Profile job is where most of the commit time is
spent. I do observe shuffle spill to disk, so we could use guidance on
improving that as well.
We have conducted some stress testing in a comparable test environment and
were able to recreate the issue even starting on a blank table. We did the same
stress testing earlier in the year, using EMR 6.15 (Hudi 0.14.0-amzn), and did
not perceive this behavior.
What I found in the logs during the time of the slow commits are these logs.
During the faster commits, these logs don't show up. I noticed around the same
time the Delete Archived instants jobs had run, which prompted me to try to
turn off the async archive service.
24/12/03 02:50:15 WARN PriorityBasedFileSystemView: Got error running
preferred function. Likely due to another concurrent writer in progress. Trying
secondary
24/12/03 02:50:17 WARN PriorityBasedFileSystemView: Routing request to
secondary file-system view
Turning off async archive service alleviated the periodic extremely slow
commit, however we still believe commit times are lackluster given the load and
cluster size.
What other configurations should we look at to tune the write speeds? Do we
need to reconsider our table design?
Below are the Hudi configurations in our production environment. Attached
are some Spark UI screenshots as well.
hudi-defaults on the cluster:
{
"hoodie.archive.async": "true",
"hoodie.archive.automatic": "true",
"hoodie.compact.inline.max.delta.seconds": "21600",
"hoodie.compact.inline.trigger.strategy": "TIME_ELAPSED",
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop":
"true",
"hoodie.embed.timeline.server.async": "true",
"hoodie.schema.cache.enable": "true"
}
Spark writer configuration:
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.dynamodb.region": "us-east-1",
"hoodie.write.lock.dynamodb.table": "oitroot-op1-datasync-hudi-locks",
"hoodie.write.lock.provider":
"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"
"path": hudi_output_path,
"hoodie.table.name": clean_table_name,
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": record_key_fields,
"hoodie.datasource.write.precombine.field": precombine_field,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": clean_db_name,
"hoodie.datasource.hive_sync.table": clean_table_name,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.omit_metadata_fields": "true",
"hoodie.datasource.meta_sync.condition.sync": "true"
Thank you for your assistance!
**Expected behavior**
Consistent and faster commit times, ideally under 10 minutes
**Environment Description**
* Hudi version : 0.14.1-amzn-1
* Spark version : 3.5.1
* Hive version : 3.1.3
* Hadoop version : 3.3.6
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : yes




--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]